You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/12/15 22:29:35 UTC

hive git commit: HIVE-12431 : Support timeout for compile lock (Mohit Sabharwal via Szehon)

Repository: hive
Updated Branches:
  refs/heads/master 25c207c93 -> e091bc271


HIVE-12431 : Support timeout for compile lock (Mohit Sabharwal via Szehon)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e091bc27
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e091bc27
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e091bc27

Branch: refs/heads/master
Commit: e091bc27183dc0e24a554e599f1584249650306a
Parents: 25c207c
Author: Szehon Ho <sz...@cloudera.com>
Authored: Tue Dec 15 13:27:22 2015 -0800
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Tue Dec 15 13:27:22 2015 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +-
 .../java/org/apache/hadoop/hive/ql/Driver.java  |  51 +++++++-
 .../org/apache/hadoop/hive/ql/ErrorMsg.java     |   1 +
 .../apache/hive/service/cli/CLIServiceTest.java | 127 +++++++++++++++++--
 4 files changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 31f0634..243f281 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1849,7 +1849,10 @@ public class HiveConf extends Configuration {
         "Bind host on which to run the HiveServer2 Thrift service."),
     HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
         "enable parallel compilation between sessions on HiveServer2. The default is false."),
-
+    HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s",
+        new TimeValidator(TimeUnit.SECONDS),
+        "Number of seconds a request will wait to acquire the compile lock before giving up. " +
+        "Setting it to 0s disables the timeout."),
     // HiveServer2 WebUI
     HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
     HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on"),

http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f6af6ca..3d5f3b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -33,6 +33,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.commons.lang.StringUtils;
@@ -1128,9 +1129,12 @@ public class Driver implements CommandProcessor {
   private int compileInternal(String command) {
     boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled;
     int ret;
-    final ReentrantLock compileLock = isParallelEnabled
-        ? SessionState.get().getCompileLock() : globalCompileLock;
-    compileLock.lock();
+    final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
+        command);
+    if (compileLock == null) {
+      return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
+    }
+
     try {
       if (isParallelEnabled && LOG.isDebugEnabled()) {
         LOG.debug("Entering compile: " + command);
@@ -1142,6 +1146,7 @@ public class Driver implements CommandProcessor {
     } finally {
       compileLock.unlock();
     }
+
     if (ret != 0) {
       try {
         releaseLocksAndCommitOrRollback(false, null);
@@ -1153,6 +1158,46 @@ public class Driver implements CommandProcessor {
     return ret;
   }
 
+  /**
+   * Acquires the compile lock. If the compile lock wait timeout is configured,
+   * it will acquire the lock if it is not held by another thread within the given
+   * waiting time.
+   * @return the ReentrantLock object if the lock was successfully acquired,
+   *         or {@code null} if compile lock wait timeout is configured and
+   *         either the waiting time elapsed before the lock could be acquired
+   *         or if the current thread is interrupted.
+   */
+  private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled,
+    String command) {
+    final ReentrantLock compileLock = isParallelEnabled ?
+        SessionState.get().getCompileLock() : globalCompileLock;
+    long maxCompileLockWaitTime = HiveConf.getTimeVar(
+          this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT,
+          TimeUnit.SECONDS);
+    if (maxCompileLockWaitTime > 0) {
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Waiting to acquire compile lock: " + command);
+        }
+        if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) {
+          errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg();
+          LOG.error(errorMessage + ": " + command);
+          return null;
+        }
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Interrupted Exception ignored", e);
+        }
+        return null;
+      }
+    } else {
+      compileLock.lock();
+    }
+
+    return compileLock;
+  }
+
   private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
       throws CommandNeedRetryException {
     errorMessage = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9d9dd53..d759739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -426,6 +426,7 @@ public enum ErrorMsg {
   TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true),
   //{2} should be lockid
   LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms.  {2}", true),
+  COMPILE_LOCK_TIMED_OUT(10308, "Attempt to acquire compile lock timed out.", true),
   //========================== 20000 range starts here ========================//
   SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
   SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "

http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index 7bfbdb9..e78181a 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
@@ -33,11 +35,16 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.service.server.HiveServer2;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -303,15 +310,15 @@ public abstract class CLIServiceTest {
     // Create callables with different queries.
     String query = "SELECT ID + %1$d FROM " + tableName;
     cs[0] = createQueryCallable(
-        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
     query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM  " + tableName + " t1 CROSS JOIN "
         + tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1";
     cs[1] = createQueryCallable(
-        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
     query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM  " + tableName
         + " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b";
     cs[2] = createQueryCallable(
-        query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+        query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
 
     @SuppressWarnings("unchecked")
     FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
@@ -334,13 +341,118 @@ public abstract class CLIServiceTest {
     client.closeSession(sessionHandle);
   }
 
+  public static class CompileLockTestSleepHook implements HiveSemanticAnalyzerHook {
+    @Override
+    public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
+      ASTNode ast) throws SemanticException {
+      try {
+        Thread.sleep(20 * 1000);
+      } catch (Throwable t) {
+        // do nothing
+      }
+      return ast;
+    }
+
+    @Override
+    public void postAnalyze(HiveSemanticAnalyzerHookContext context,
+      List<Task<? extends Serializable>> rootTasks) throws SemanticException {
+    }
+  }
+
+  @Test
+  public void testGlobalCompileLockTimeout() throws Exception {
+    String tableName = "TEST_COMPILE_LOCK_TIMEOUT";
+    String columnDefinitions = "(ID STRING)";
+
+    // Open a session and set up the test data
+    SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions,
+        new HashMap<String, String>());
+    assertNotNull(sessionHandle);
+
+    int THREAD_COUNT = 3;
+    @SuppressWarnings("unchecked")
+    FutureTask<Void>[] tasks = (FutureTask<Void>[])new FutureTask[THREAD_COUNT];
+    long longPollingTimeoutMs = 10 * 60 * 1000; // Larger than max compile duration used in test
+
+    // 1st query acquires the lock and takes 20 secs to compile
+    Map<String, String> confOverlay = getConfOverlay(0, longPollingTimeoutMs);
+    confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+        CompileLockTestSleepHook.class.getName());
+    String query = "SELECT 0 FROM " + tableName;
+    tasks[0] = new FutureTask<Void>(
+        createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+            OperationState.FINISHED, false, null, null));
+    new Thread(tasks[0]).start();
+    Thread.sleep(5 * 1000);
+
+    // 2nd query's session has compile lock timeout of 1 sec, so it should
+    // not be able to acquire the lock within that time period
+    confOverlay = getConfOverlay(1, longPollingTimeoutMs);
+    query = "SELECT 1 FROM " + tableName;
+    tasks[1] = new FutureTask<Void>(
+        createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+            OperationState.ERROR, false, null, null));
+    new Thread(tasks[1]).start();
+
+    // 3rd query's session has compile lock timeout of 100 secs, so it should
+    // be able to acquire the lock and finish successfully
+    confOverlay = getConfOverlay(100, longPollingTimeoutMs);
+    query = "SELECT 2 FROM " + tableName;
+    tasks[2] = new FutureTask<Void>(
+        createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+            OperationState.FINISHED, false, null, null));
+    new Thread(tasks[2]).start();
+
+    boolean foundExpectedException = false;
+    for (int i = 0; i < THREAD_COUNT; ++i) {
+      try {
+        tasks[i].get();
+      } catch (Throwable t) {
+        if (i == 1) {
+          assertTrue(t.getMessage().contains(
+              ErrorMsg.COMPILE_LOCK_TIMED_OUT.getMsg()));
+          foundExpectedException = true;
+        } else {
+          throw new RuntimeException(t);
+        }
+      }
+    }
+    assertTrue(foundExpectedException);
+
+    // Cleanup
+    client.executeStatement(sessionHandle, "DROP TABLE " + tableName,
+        getConfOverlay(0, longPollingTimeoutMs));
+    client.closeSession(sessionHandle);
+  }
+
+  private Map<String, String> getConfOverlay(long compileLockTimeoutSecs,
+    long longPollingTimeoutMs) {
+    Map<String, String> confOverlay = new HashMap<String, String>();
+    confOverlay.put(
+        HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION.varname, "false");
+    confOverlay.put(
+        HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname,
+        longPollingTimeoutMs + "ms");
+    if (compileLockTimeoutSecs > 0) {
+      confOverlay.put(
+          HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT.varname,
+          compileLockTimeoutSecs + "s");
+    }
+    return confOverlay;
+  }
+
   private Callable<Void> createQueryCallable(final String queryStringFormat,
       final Map<String, String> confOverlay, final long longPollingTimeout,
-      final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+      final int queryCount, final OperationState expectedOperationState,
+      final boolean syncThreadStart, final CountDownLatch cdlIn,
+      final CountDownLatch cdlOut) {
     return new Callable<Void>() {
       @Override
       public Void call() throws Exception {
-        syncThreadStart(cdlIn, cdlOut);
+        if (syncThreadStart) {
+          syncThreadStart(cdlIn, cdlOut);
+        }
+
         SessionHandle sessionHandle = openSession(confOverlay);
         OperationHandle[] hs  = new OperationHandle[queryCount];
         for (int i = 0; i < hs.length; ++i) {
@@ -349,7 +461,7 @@ public abstract class CLIServiceTest {
           hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
         }
         for (int i = hs.length - 1; i >= 0; --i) {
-          waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout);
+          waitForAsyncQuery(hs[i], expectedOperationState, longPollingTimeout);
         }
         return null;
       }
@@ -405,7 +517,6 @@ public abstract class CLIServiceTest {
     return waitForAsyncQuery(h, expectedState, longPollingTimeout);
   }
 
-
   private OperationStatus waitForAsyncQuery(OperationHandle opHandle,
       OperationState expectedState, long longPollingTimeout) throws HiveSQLException {
     long testIterationTimeout = System.currentTimeMillis() + 100000;