You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2016/07/14 21:13:51 UTC

hive git commit: HIVE-11402 : HS2 - add an option to disallow parallel query execution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair)

Repository: hive
Updated Branches:
  refs/heads/master 3522f3f4c -> 8f183945a


HIVE-11402 : HS2 - add an option to disallow parallel query execution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f183945
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f183945
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f183945

Branch: refs/heads/master
Commit: 8f183945ab0e4f9a87eb3812eeab84ec12842751
Parents: 3522f3f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 14 14:13:39 2016 -0700
Committer: Thejas Nair <th...@hortonworks.com>
Committed: Thu Jul 14 14:13:39 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +
 .../org/apache/hive/jdbc/TestJdbcDriver2.java   |  31 ++++
 .../cli/session/TestHiveSessionImpl.java        |   4 +-
 .../operation/ExecuteStatementOperation.java    |   2 +-
 .../hive/service/cli/operation/Operation.java   |  20 ++-
 .../service/cli/operation/SQLOperation.java     | 145 +++++++++++--------
 .../hive/service/cli/session/HiveSession.java   |   3 +
 .../service/cli/session/HiveSessionImpl.java    | 137 ++++++++++++------
 .../cli/session/HiveSessionImplwithUGI.java     |   3 +-
 9 files changed, 228 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bae1825..42f7d88 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2154,6 +2154,8 @@ public class HiveConf extends Configuration {
         new TimeValidator(TimeUnit.SECONDS),
         "Number of seconds a request will wait to acquire the compile lock before giving up. " +
         "Setting it to 0s disables the timeout."),
+    HIVE_SERVER2_PARALLEL_OPS_IN_SESSION("hive.server2.parallel.ops.in.session", true,
+        "Whether to allow several parallel operations (such as SQL statements) in one session."),
 
     // HiveServer2 WebUI
     HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index b0fa98f..58f0d43 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -135,6 +135,7 @@ public class TestJdbcDriver2 {
     System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose");
     System.setProperty(ConfVars.HIVEMAPREDMODE.varname, "nonstrict");
     System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider");
+    System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false");
 
     Statement stmt1 = con1.createStatement();
     assertNotNull("Statement is null", stmt1);
@@ -328,6 +329,23 @@ public class TestJdbcDriver2 {
   }
 
   @Test
+  public void testSerializedExecution() throws Exception {
+    // Test running parallel queries (with parallel queries disabled).
+    // Should be serialized in the order of execution.
+    HiveStatement stmt1 = (HiveStatement) con.createStatement();
+    HiveStatement stmt2 = (HiveStatement) con.createStatement();
+    stmt1.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+    stmt1.execute("create table test_ser_1(i int)");
+    stmt1.executeAsync("insert into test_ser_1 select sleepMsUDF(under_col, 500) from "
+        + tableName + " limit 1");
+    boolean isResultSet = stmt2.executeAsync("select * from test_ser_1");
+    assertTrue(isResultSet);
+    ResultSet rs = stmt2.getResultSet();
+    assertTrue(rs.next());
+    assertFalse(rs.next());
+  }
+
+  @Test
   public void testParentReferences() throws Exception {
     /* Test parent references from Statement */
     Statement s = this.con.createStatement();
@@ -2534,6 +2552,19 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
     }
   }
 
+
+  // A udf which sleeps for some number of ms to simulate a long running query
+  public static class SleepMsUDF extends UDF {
+    public Integer evaluate(final Integer value, final Integer ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // No-op
+      }
+      return value;
+    }
+  }
+
   /**
    * Loads data from a table containing non-ascii value column
    * Runs a query and compares the return value

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
index d58a913..c7fa5da 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
@@ -51,11 +51,11 @@ public class TestHiveSessionImpl {
     HiveSessionImpl session = new HiveSessionImpl(protocol, username, password,
       serverhiveConf, ipAddress) {
       @Override
-      protected synchronized void acquire(boolean userAccess) {
+      protected synchronized void acquire(boolean userAccess, boolean isOperation) {
       }
 
       @Override
-      protected synchronized void release(boolean userAccess) {
+      protected synchronized void release(boolean userAccess, boolean isOperation) {
       }
     };
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index ff46ed8..2dd90b6 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -31,7 +31,7 @@ public abstract class ExecuteStatementOperation extends Operation {
 
   public ExecuteStatementOperation(HiveSession parentSession, String statement,
       Map<String, String> confOverlay, boolean runInBackground) {
-    super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground);
+    super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT);
     this.statement = statement;
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 021c1fe..90fe76d 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -67,7 +67,6 @@ public abstract class Operation {
   public static final long DEFAULT_FETCH_MAX_ROWS = 100;
   protected boolean hasResultSet;
   protected volatile HiveSQLException operationException;
-  protected final boolean runAsync;
   protected volatile Future<?> backgroundHandle;
   protected OperationLog operationLog;
   protected boolean isOperationLogEnabled;
@@ -85,23 +84,29 @@ public abstract class Operation {
   protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
       EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
 
+
   protected Operation(HiveSession parentSession, OperationType opType) {
-    this(parentSession, null, opType, false);
- }
+    this(parentSession, null, opType);
+  }
+  
+  protected Operation(HiveSession parentSession, Map<String, String> confOverlay,
+      OperationType opType) {
+    this(parentSession, confOverlay, opType, false);
+  }
 
-  protected Operation(HiveSession parentSession, Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
+  protected Operation(HiveSession parentSession,
+      Map<String, String> confOverlay, OperationType opType, boolean isAsyncQueryState) {
     this.parentSession = parentSession;
     if (confOverlay != null) {
       this.confOverlay = confOverlay;
     }
-    this.runAsync = runInBackground;
     this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
     beginTime = System.currentTimeMillis();
     lastAccessTime = beginTime;
     operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
         HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
     setMetrics(state);
-    queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runAsync);
+    queryState = new QueryState(parentSession.getHiveConf(), confOverlay, isAsyncQueryState);
   }
 
   public Future<?> getBackgroundHandle() {
@@ -113,10 +118,9 @@ public abstract class Operation {
   }
 
   public boolean shouldRunAsync() {
-    return runAsync;
+    return false; // Most operations cannot run asynchronously.
   }
 
-
   public HiveSession getParentSession() {
     return parentSession;
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index e453354..6f2daf3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -89,7 +89,6 @@ import org.codehaus.jackson.map.ObjectMapper;
  */
 @SuppressWarnings("deprecation")
 public class SQLOperation extends ExecuteStatementOperation {
-
   private Driver driver = null;
   private CommandProcessorResponse response;
   private TableSchema resultSchema = null;
@@ -101,6 +100,7 @@ public class SQLOperation extends ExecuteStatementOperation {
   private SQLOperationDisplay sqlOpDisplay;
   private long queryTimeout;
   private ScheduledExecutorService timeoutExecutor;
+  private final boolean runAsync;
 
   /**
    * A map to track query count running by each user
@@ -112,6 +112,7 @@ public class SQLOperation extends ExecuteStatementOperation {
       boolean runInBackground, long queryTimeout) {
     // TODO: call setRemoteUser in ExecuteStatementOperation or higher.
     super(parentSession, statement, confOverlay, runInBackground);
+    this.runAsync = runInBackground;
     this.queryTimeout = queryTimeout;
     long timeout = HiveConf.getTimeVar(queryState.getConf(),
         HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -127,6 +128,11 @@ public class SQLOperation extends ExecuteStatementOperation {
     }
   }
 
+  @Override
+  public boolean shouldRunAsync() {
+    return runAsync;
+  }
+
   private void setupSessionIO(SessionState sessionState) {
     try {
       sessionState.in = null; // hive server's session input stream is not used
@@ -278,70 +284,16 @@ public class SQLOperation extends ExecuteStatementOperation {
     if (!runAsync) {
       runQuery();
     } else {
-      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
-      final SessionState parentSessionState = SessionState.get();
-      // ThreadLocal Hive object needs to be set in background thread.
-      // The metastore client in Hive is associated with right user.
-      final Hive parentHive = parentSession.getSessionHive();
-      final PerfLogger parentPerfLogger = SessionState.getPerfLogger();
-      // Current UGI will get used by metastore when metsatore is in embedded mode
-      // So this needs to get passed to the new background thread
-      final UserGroupInformation currentUGI = getCurrentUGI();
-      // Runnable impl to call runInternal asynchronously,
-      // from a different thread
-      Runnable backgroundOperation = new Runnable() {
-        @Override
-        public void run() {
-          PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
-            @Override
-            public Object run() throws HiveSQLException {
-              Hive.set(parentHive);
-              // TODO: can this result in cross-thread reuse of session state?
-              SessionState.setCurrentSessionState(parentSessionState);
-              PerfLogger.setPerfLogger(parentPerfLogger);
-              // Set current OperationLog in this async thread for keeping on saving query log.
-              registerCurrentOperationLog();
-              registerLoggingContext();
-              try {
-                if (asyncPrepare) {
-                  prepare(queryState);
-                }
-                runQuery();
-              } catch (HiveSQLException e) {
-                setOperationException(e);
-                LOG.error("Error running hive query: ", e);
-              } finally {
-                unregisterLoggingContext();
-                unregisterOperationLog();
-              }
-              return null;
-            }
-          };
+      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
+      // 1) ThreadLocal Hive object needs to be set in background thread
+      // 2) The metastore client in Hive is associated with right user.
+      // 3) Current UGI will get used by metastore when metastore is in embedded mode
+      Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),
+          SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);
 
-          try {
-            currentUGI.doAs(doAsAction);
-          } catch (Exception e) {
-            setOperationException(new HiveSQLException(e));
-            LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
-          }
-          finally {
-            /**
-             * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
-             * when this thread is garbage collected later.
-             * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
-             */
-            if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
-              ThreadWithGarbageCleanup currentThread =
-                  (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
-              currentThread.cacheThreadLocalRawStore();
-            }
-          }
-        }
-      };
       try {
         // This submit blocks if no background threads are available to run this operation
-        Future<?> backgroundHandle =
-            getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+        Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);
         setBackgroundHandle(backgroundHandle);
       } catch (RejectedExecutionException rejected) {
         setState(OperationState.ERROR);
@@ -351,6 +303,74 @@ public class SQLOperation extends ExecuteStatementOperation {
     }
   }
 
+
+  private final class BackgroundWork implements Runnable {
+    private final UserGroupInformation currentUGI;
+    private final Hive parentHive;
+    private final PerfLogger parentPerfLogger;
+    private final SessionState parentSessionState;
+    private final boolean asyncPrepare;
+
+    private BackgroundWork(UserGroupInformation currentUGI,
+        Hive parentHive, PerfLogger parentPerfLogger,
+        SessionState parentSessionState, boolean asyncPrepare) {
+      this.currentUGI = currentUGI;
+      this.parentHive = parentHive;
+      this.parentPerfLogger = parentPerfLogger;
+      this.parentSessionState = parentSessionState;
+      this.asyncPrepare = asyncPrepare;
+    }
+
+    @Override
+    public void run() {
+      PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
+        @Override
+        public Object run() throws HiveSQLException {
+          Hive.set(parentHive);
+          // TODO: can this result in cross-thread reuse of session state?
+          SessionState.setCurrentSessionState(parentSessionState);
+          PerfLogger.setPerfLogger(parentPerfLogger);
+          // Set current OperationLog in this async thread for keeping on saving query log.
+          registerCurrentOperationLog();
+          registerLoggingContext();
+          try {
+            if (asyncPrepare) {
+              prepare(queryState);
+            }
+            runQuery();
+          } catch (HiveSQLException e) {
+            setOperationException(e);
+            LOG.error("Error running hive query: ", e);
+          } finally {
+            unregisterLoggingContext();
+            unregisterOperationLog();
+          }
+          return null;
+        }
+      };
+
+      try {
+        currentUGI.doAs(doAsAction);
+      } catch (Exception e) {
+        setOperationException(new HiveSQLException(e));
+        LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
+      }
+      finally {
+        /**
+         * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+         * when this thread is garbage collected later.
+         * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+         */
+        if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+          ThreadWithGarbageCleanup currentThread =
+              (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+          currentThread.cacheThreadLocalRawStore();
+        }
+      }
+    }
+  }
+
+
   /**
    * Returns the current UGI on the stack
    * @param opConfig
@@ -669,4 +689,5 @@ public class SQLOperation extends ExecuteStatementOperation {
   public String getExecutionEngine() {
     return queryState.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 78ff388..e5d865b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -20,6 +20,7 @@ package org.apache.hive.service.cli.session;
 
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Future;
 
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -211,4 +212,6 @@ public interface HiveSession extends HiveSessionBase {
   void closeExpiredOperations();
 
   long getNoOperationTime();
+
+  Future<?> submitBackgroundOperation(Runnable work);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 7341635..eca02d9 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -24,11 +24,13 @@ import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.StringUtils;
@@ -38,8 +40,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.history.HiveHistory;
 import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -114,16 +114,19 @@ public class HiveSessionImpl implements HiveSession {
   private volatile long lastAccessTime;
   private volatile long lastIdleTime;
   private volatile int pendingCount = 0;
+  private final Semaphore operationLock;
 
 
-  public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password,
-    HiveConf serverhiveConf, String ipAddress) {
+  public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
+      String username, String password, HiveConf serverConf, String ipAddress) {
     this.username = username;
     this.password = password;
     creationTime = System.currentTimeMillis();
     this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol);
-    this.sessionConf = new HiveConf(serverhiveConf);
+    this.sessionConf = new HiveConf(serverConf);
     this.ipAddress = ipAddress;
+    this.operationLock = serverConf.getBoolVar(
+        ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1);
     try {
       // In non-impersonation mode, map scheduler queue to current user
       // if fair scheduler is configured.
@@ -326,7 +329,27 @@ public class HiveSessionImpl implements HiveSession {
     this.operationManager = operationManager;
   }
 
-  protected synchronized void acquire(boolean userAccess) {
+  protected void acquire(boolean userAccess, boolean isOperation) {
+    if (isOperation && operationLock != null) {
+      try {
+        operationLock.acquire();
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new RuntimeException(e);
+      }
+    }
+    boolean success = false;
+    try {
+      acquireAfterOpLock(userAccess);
+      success = true;
+    } finally {
+      if (!success && isOperation && operationLock != null) {
+        operationLock.release();
+      }
+    }
+  }
+
+  private synchronized void acquireAfterOpLock(boolean userAccess) {
     // Need to make sure that the this HiveServer2's session's SessionState is
     // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
@@ -348,7 +371,17 @@ public class HiveSessionImpl implements HiveSession {
    * when this thread is garbage collected later.
    * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
    */
-  protected synchronized void release(boolean userAccess) {
+  protected void release(boolean userAccess, boolean isOperation) {
+    try {
+      releaseBeforeOpLock(userAccess);
+    } finally {
+      if (isOperation && operationLock != null) {
+        operationLock.release();
+      }
+    }
+  }
+
+  private synchronized void releaseBeforeOpLock(boolean userAccess) {
     if (sessionState != null) {
       // can be null in-case of junit tests. skip reset.
       // reset thread name at release time.
@@ -405,7 +438,7 @@ public class HiveSessionImpl implements HiveSession {
   @Override
   public GetInfoValue getInfo(GetInfoType getInfoType)
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
     try {
       switch (getInfoType) {
       case CLI_SERVER_NAME:
@@ -425,7 +458,7 @@ public class HiveSessionImpl implements HiveSession {
         throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
       }
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
@@ -453,12 +486,11 @@ public class HiveSessionImpl implements HiveSession {
 
   private OperationHandle executeStatementInternal(String statement,
       Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
-    ExecuteStatementOperation operation =
-        operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay,
-            runAsync, queryTimeout);
+    ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(
+        getSession(), statement, confOverlay, runAsync, queryTimeout);
     OperationHandle opHandle = operation.getHandle();
     try {
       operation.run();
@@ -471,14 +503,29 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      if (operation.getBackgroundHandle() == null) {
+        release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.)
+      } else {
+        releaseBeforeOpLock(true); // Release, but keep the lock (if present).
+      }
     }
   }
 
   @Override
+  public Future<?> submitBackgroundOperation(Runnable work) {
+    return getSessionManager().submitBackgroundOperation(
+        operationLock == null ? work : new FutureTask<Void>(work, null) {
+      protected void done() {
+        // We assume this always comes from a user operation that took the lock.
+        operationLock.release();
+      };
+    });
+  }
+
+  @Override
   public OperationHandle getTypeInfo()
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -491,14 +538,14 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public OperationHandle getCatalogs()
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -511,14 +558,14 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public OperationHandle getSchemas(String catalogName, String schemaName)
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetSchemasOperation operation =
@@ -532,7 +579,7 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
@@ -540,7 +587,7 @@ public class HiveSessionImpl implements HiveSession {
   public OperationHandle getTables(String catalogName, String schemaName, String tableName,
       List<String> tableTypes)
           throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     MetadataOperation operation =
@@ -554,14 +601,14 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public OperationHandle getTableTypes()
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -574,14 +621,14 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public OperationHandle getColumns(String catalogName, String schemaName,
       String tableName, String columnName)  throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
     String addedJars = Utilities.getResourceFiles(sessionConf, SessionState.ResourceType.JAR);
     if (StringUtils.isNotBlank(addedJars)) {
        IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
@@ -599,7 +646,7 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
@@ -612,7 +659,7 @@ public class HiveSessionImpl implements HiveSession {
   @Override
   public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
       throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetFunctionsOperation operation = operationManager
@@ -626,14 +673,14 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public void close() throws HiveSQLException {
     try {
-      acquire(true);
+      acquire(true, false);
       // Iterate through the opHandles and close their operations
       List<OperationHandle> ops = null;
       synchronized (opHandleSet) {
@@ -675,7 +722,7 @@ public class HiveSessionImpl implements HiveSession {
         }
         sessionHive = null;
       }
-      release(true);
+      release(true, false);
     }
   }
 
@@ -736,7 +783,7 @@ public class HiveSessionImpl implements HiveSession {
   }
 
   private void closeTimedOutOperations(List<Operation> operations) {
-    acquire(false);
+    acquire(false, false);
     try {
       for (Operation operation : operations) {
         synchronized (opHandleSet) {
@@ -749,54 +796,54 @@ public class HiveSessionImpl implements HiveSession {
         }
       }
     } finally {
-      release(false);
+      release(false, false);
     }
   }
 
   @Override
   public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire(true);
+    acquire(true, false);
     try {
       sessionManager.getOperationManager().cancelOperation(opHandle);
     } finally {
-      release(true);
+      release(true, false);
     }
   }
 
   @Override
   public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
-    acquire(true);
+    acquire(true, false);
     try {
       operationManager.closeOperation(opHandle);
       synchronized (opHandleSet) {
         opHandleSet.remove(opHandle);
       }
     } finally {
-      release(true);
+      release(true, false);
     }
   }
 
   @Override
   public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
     try {
       return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
   @Override
   public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
       long maxRows, FetchType fetchType) throws HiveSQLException {
-    acquire(true);
+    acquire(true, false);
     try {
       if (fetchType == FetchType.QUERY_OUTPUT) {
         return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
       }
       return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf);
     } finally {
-      release(true);
+      release(true, false);
     }
   }
 
@@ -850,7 +897,7 @@ public class HiveSessionImpl implements HiveSession {
   @Override
   public OperationHandle getPrimaryKeys(String catalog, String schema,
     String table) throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetPrimaryKeysOperation operation = operationManager
@@ -864,7 +911,7 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 
@@ -872,7 +919,7 @@ public class HiveSessionImpl implements HiveSession {
   public OperationHandle getCrossReference(String primaryCatalog,
     String primarySchema, String primaryTable, String foreignCatalog,
     String foreignSchema, String foreignTable) throws HiveSQLException {
-    acquire(true);
+    acquire(true, true);
 
     OperationManager operationManager = getOperationManager();
     GetCrossReferenceOperation operation = operationManager
@@ -888,7 +935,7 @@ public class HiveSessionImpl implements HiveSession {
       operationManager.closeOperation(opHandle);
       throw e;
     } finally {
-      release(true);
+      release(true, true);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index f7b3412..afed9e2 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -89,9 +89,10 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
   @Override
   public void close() throws HiveSQLException {
     try {
-      acquire(true);
+      acquire(true, false);
       cancelDelegationToken();
     } finally {
+      release(true, false);
       try {
         super.close();
       } finally {