You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lens.apache.org by am...@apache.org on 2015/03/31 06:40:59 UTC

incubator-lens git commit: LENS-469 : Remove locking on HiveDriver.updateStatus (Jaideep Dhok via amareshwari)

Repository: incubator-lens
Updated Branches:
  refs/heads/master e3e45aef2 -> 72691f12f


LENS-469 : Remove locking on HiveDriver.updateStatus (Jaideep Dhok via amareshwari)


Project: http://git-wip-us.apache.org/repos/asf/incubator-lens/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-lens/commit/72691f12
Tree: http://git-wip-us.apache.org/repos/asf/incubator-lens/tree/72691f12
Diff: http://git-wip-us.apache.org/repos/asf/incubator-lens/diff/72691f12

Branch: refs/heads/master
Commit: 72691f12f7b153be209dea09b02bbab34b1d41a4
Parents: e3e45ae
Author: Jaideep Dhok <jd...@apache.org>
Authored: Tue Mar 31 10:10:53 2015 +0530
Committer: Amareshwari Sriramadasu <am...@apache.org>
Committed: Tue Mar 31 10:10:53 2015 +0530

----------------------------------------------------------------------
 .../org/apache/lens/driver/hive/HiveDriver.java | 92 ++++++++------------
 1 file changed, 38 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-lens/blob/72691f12/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
----------------------------------------------------------------------
diff --git a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
index 11ab47a..3edce4d 100644
--- a/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
+++ b/lens-driver-hive/src/main/java/org/apache/lens/driver/hive/HiveDriver.java
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.TimeUnit;
@@ -104,18 +105,15 @@ public class HiveDriver implements LensDriver {
   private HiveConf hiveConf;
 
   /** The hive handles. */
-  private Map<QueryHandle, OperationHandle> hiveHandles = new HashMap<QueryHandle, OperationHandle>();
+  private Map<QueryHandle, OperationHandle> hiveHandles = new ConcurrentHashMap<QueryHandle, OperationHandle>();
 
   /** The session lock. */
   private final Lock sessionLock;
 
-  /** The connection lock. */
-  private final Lock connectionLock;
-
   // connections need to be separate for each user and each thread
   /** The thread connections. */
-  private final Map<String, Map<Long, ExpirableConnection>> threadConnections =
-    new HashMap<String, Map<Long, ExpirableConnection>>();
+  private final Map<String, ExpirableConnection> threadConnections =
+    new HashMap<String, ExpirableConnection>();
 
   /** The thrift conn expiry queue. */
   private final DelayQueue<ExpirableConnection> thriftConnExpiryQueue = new DelayQueue<ExpirableConnection>();
@@ -305,7 +303,6 @@ public class HiveDriver implements LensDriver {
    */
   public HiveDriver() throws LensException {
     this.sessionLock = new ReentrantLock();
-    this.connectionLock = new ReentrantLock();
     lensToHiveSession = new HashMap<String, SessionHandle>();
     resourcesAddedForSession = new HashMap<SessionHandle, Boolean>();
     connectionExpiryThread.setDaemon(true);
@@ -532,7 +529,7 @@ public class HiveDriver implements LensDriver {
    * @see org.apache.lens.server.api.driver.LensDriver#updateStatus(org.apache.lens.server.api.query.QueryContext)
    */
   @Override
-  public synchronized void updateStatus(QueryContext context) throws LensException {
+  public void updateStatus(QueryContext context) throws LensException {
     LOG.debug("GetStatus: " + context.getQueryHandle());
     if (context.getDriverStatus().isFinished()) {
       return;
@@ -677,6 +674,9 @@ public class HiveDriver implements LensDriver {
    */
   @Override
   public void closeQuery(QueryHandle handle) throws LensException {
+    if (handle == null) {
+      return;
+    }
     LOG.info("CloseQuery: " + handle);
     OperationHandle opHandle = hiveHandles.remove(handle);
     if (opHandle != null) {
@@ -758,41 +758,32 @@ public class HiveDriver implements LensDriver {
       }
       return embeddedConnection.getClient();
     } else {
-      connectionLock.lock();
-      try {
-        String user = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
-        if (SessionState.get() != null && SessionState.get().getUserName() != null) {
-          user = SessionState.get().getUserName();
-        }
-        Map<Long, ExpirableConnection> userThreads = threadConnections.get(user.toLowerCase());
-        if (userThreads == null) {
-          userThreads = new HashMap<Long, ExpirableConnection>();
-          threadConnections.put(user.toLowerCase(), userThreads);
+      String user = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_CLIENT_USER);
+      if (SessionState.get() != null && SessionState.get().getUserName() != null) {
+        user = SessionState.get().getUserName();
+      }
+
+      String connectionKey = user.toLowerCase() + Thread.currentThread().getId();
+      ExpirableConnection connection = threadConnections.get(connectionKey);
+      if (connection == null || connection.isExpired()) {
+        try {
+          ThriftConnection tconn = connectionClass.newInstance();
+          tconn.init(hiveConf, user);
+          connection = new ExpirableConnection(tconn, connectionExpiryTimeout);
+          thriftConnExpiryQueue.offer(connection);
+          threadConnections.put(connectionKey, connection);
+          LOG.info("New thrift connection " + connectionClass + " for thread:" + Thread.currentThread().getId()
+            + " for user:" + user + " connection ID=" + connection.getConnId());
+        } catch (Exception e) {
+          throw new LensException(e);
         }
-        ExpirableConnection connection = userThreads.get(Thread.currentThread().getId());
-        if (connection == null || connection.isExpired()) {
-          try {
-            ThriftConnection tconn = connectionClass.newInstance();
-            tconn.init(hiveConf, user);
-            connection = new ExpirableConnection(tconn, connectionExpiryTimeout);
-            thriftConnExpiryQueue.offer(connection);
-            userThreads.put(Thread.currentThread().getId(), connection);
-            LOG.info("New thrift connection " + connectionClass + " for thread:" + Thread.currentThread().getId()
-              + " for user:" + user + " connection ID=" + connection.getConnId());
-          } catch (Exception e) {
-            throw new LensException(e);
-          }
-        } else {
-          synchronized (thriftConnExpiryQueue) {
-            thriftConnExpiryQueue.remove(connection);
-            thriftConnExpiryQueue.offer(connection);
-          }
+      } else {
+        synchronized (thriftConnExpiryQueue) {
+          thriftConnExpiryQueue.remove(connection);
+          thriftConnExpiryQueue.offer(connection);
         }
-        return connection.getConnection().getClient();
-      } finally {
-        connectionLock.unlock();
       }
-
+      return connection.getConnection().getClient();
     }
   }
 
@@ -1220,22 +1211,15 @@ public class HiveDriver implements LensDriver {
    * Close all connections.
    */
   private void closeAllConnections() {
-    connectionLock.lock();
-    try {
-      synchronized (thriftConnExpiryQueue) {
-        for (Map<Long, ExpirableConnection> connections : threadConnections.values()) {
-          for (ExpirableConnection connection : connections.values()) {
-            try {
-              connection.getConnection().close();
-            } catch (Exception ce) {
-              LOG.warn("Error closing connection to hive server");
-            }
-          }
+    synchronized (thriftConnExpiryQueue) {
+      for (ExpirableConnection connection : threadConnections.values()) {
+        try {
+          connection.getConnection().close();
+        } catch (Exception ce) {
+          LOG.warn("Error closing connection to hive server");
         }
-        threadConnections.clear();
       }
-    } finally {
-      connectionLock.unlock();
+      threadConnections.clear();
     }
   }