You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2015/03/31 06:40:59 UTC
incubator-lens git commit: LENS-469 : Remove locking on
HiveDriver.updateStatus (Jaideep Dhok via amareshwari)
Repository: incubator-lens
Updated Branches:
refs/heads/master e3e45aef2 -> 72691f12f
LENS-469 : Remove locking on HiveDriver.updateStatus (Jaideep Dhok via amareshwari)
Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/72691f12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/72691f12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/72691f12
Branch: refs/heads/master
Commit: 72691f12f7b153be209dea09b02bbab34b1d41a4
Parents: e3e45ae
Author: Jaideep Dhok <jd...@apache.org>
Authored: Tue Mar 31 10:10:53 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Tue Mar 31 10:10:53 2015 +0530
----------------------------------------------------------------------
.../org/apache/lens/driver/hive/HiveDriver.java | 92 ++++++++------------
1 file changed, 38 insertions(+), 54 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/72691f12/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index 11ab47a..3edce4d 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
@@ -104,18 +105,15 @@ public class HiveDriver implements LensDriver {
private HiveConf hiveConf;
/** The hive handles. */
- private Map<QueryHandle, OperationHandle> hiveHandles = new HashMap<QueryHandle, OperationHandle>();
+ private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>();
/** The session lock. */
private final Lock sessionLock;
- /** The connection lock. */
- private final Lock connectionLock;
-
// connections need to be separate for each user and each thread
/** The thread connections. */
- private final Map<String, Map<Long, ExpirableConnection>> threadConnections =
- new HashMap<String, Map<Long, ExpirableConnection>>();
+ private final Map<String, ExpirableConnection> threadConnections =
+ new HashMap<String, ExpirableConnection>();
/** The thrift conn expiry queue. */
private final DelayQueue<ExpirableConnection> thriftConnExpiryQueue = new DelayQueue<ExpirableConnection>();
@@ -305,7 +303,6 @@ public class HiveDriver implements LensDriver {
*/
public HiveDriver() throws LensException {
this.sessionLock = new ReentrantLock();
- this.connectionLock = new ReentrantLock();
lensToHiveSession = new HashMap<String, SessionHandle>();
resourcesAddedForSession = new HashMap<SessionHandle, Boolean>();
connectionExpiryThread.setDaemon(true);
@@ -532,7 +529,7 @@ public class HiveDriver implements LensDriver {
* @see org.apache.lens.server.api.driver.LensDriver#updateStatus(org.apache.lens.server.api.query.QueryContext)
*/
@Override
- public synchronized void updateStatus(QueryContext context) throws LensException {
+ public void updateStatus(QueryContext context) throws LensException {
LOG.debug("GetStatus: " + context.getQueryHandle());
if (context.getDriverStatus().isFinished()) {
return;
@@ -677,6 +674,9 @@ public class HiveDriver implements LensDriver {
*/
@Override
public void closeQuery(QueryHandle handle) throws LensException {
+ if (handle == null) {
+ return;
+ }
LOG.info("CloseQuery: " + handle);
OperationHandle opHandle = hiveHandles.remove(handle);
if (opHandle != null) {
@@ -758,41 +758,32 @@ public class HiveDriver implements LensDriver {
}
return embeddedConnection.getClient();
} else {
- connectionLock.lock();
- try {
- String user = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
- if (SessionState.get() != null && SessionState.get().getUserName() != null) {
- user = SessionState.get().getUserName();
- }
- Map<Long, ExpirableConnection> userThreads = threadConnections.get(user.toLowerCase());
- if (userThreads == null) {
- userThreads = new HashMap<Long, ExpirableConnection>();
- threadConnections.put(user.toLowerCase(), userThreads);
+ String user = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
+ if (SessionState.get() != null && SessionState.get().getUserName() != null) {
+ user = SessionState.get().getUserName();
+ }
+
+ String connectionKey = user.toLowerCase() + Thread.currentThread().getId();
+ ExpirableConnection connection = threadConnections.get(connectionKey);
+ if (connection == null || connection.isExpired()) {
+ try {
+ ThriftConnection tconn = connectionClass.newInstance();
+ tconn.init(hiveConf, user);
+ connection = new ExpirableConnection(tconn, connectionExpiryTimeout);
+ thriftConnExpiryQueue.offer(connection);
+ threadConnections.put(connectionKey, connection);
+ LOG.info("New thrift connection " + connectionClass + " for thread:" + Thread.currentThread().getId()
+ + " for user:" + user + " connection ID=" + connection.getConnId());
+ } catch (Exception e) {
+ throw new LensException(e);
}
- ExpirableConnection connection = userThreads.get(Thread.currentThread().getId());
- if (connection == null || connection.isExpired()) {
- try {
- ThriftConnection tconn = connectionClass.newInstance();
- tconn.init(hiveConf, user);
- connection = new ExpirableConnection(tconn, connectionExpiryTimeout);
- thriftConnExpiryQueue.offer(connection);
- userThreads.put(Thread.currentThread().getId(), connection);
- LOG.info("New thrift connection " + connectionClass + " for thread:" + Thread.currentThread().getId()
- + " for user:" + user + " connection ID=" + connection.getConnId());
- } catch (Exception e) {
- throw new LensException(e);
- }
- } else {
- synchronized (thriftConnExpiryQueue) {
- thriftConnExpiryQueue.remove(connection);
- thriftConnExpiryQueue.offer(connection);
- }
+ } else {
+ synchronized (thriftConnExpiryQueue) {
+ thriftConnExpiryQueue.remove(connection);
+ thriftConnExpiryQueue.offer(connection);
}
- return connection.getConnection().getClient();
- } finally {
- connectionLock.unlock();
}
-
+ return connection.getConnection().getClient();
}
}
@@ -1220,22 +1211,15 @@ public class HiveDriver implements LensDriver {
* Close all connections.
*/
private void closeAllConnections() {
- connectionLock.lock();
- try {
- synchronized (thriftConnExpiryQueue) {
- for (Map<Long, ExpirableConnection> connections : threadConnections.values()) {
- for (ExpirableConnection connection : connections.values()) {
- try {
- connection.getConnection().close();
- } catch (Exception ce) {
- LOG.warn("Error closing connection to hive server");
- }
- }
+ synchronized (thriftConnExpiryQueue) {
+ for (ExpirableConnection connection : threadConnections.values()) {
+ try {
+ connection.getConnection().close();
+ } catch (Exception ce) {
+ LOG.warn("Error closing connection to hive server");
}
- threadConnections.clear();
}
- } finally {
- connectionLock.unlock();
+ threadConnections.clear();
}
}