You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sz...@apache.org on 2015/12/15 22:29:35 UTC
hive git commit: HIVE-12431 : Support timeout for compile lock (Mohit
Sabharwal via Szehon)
Repository: hive
Updated Branches:
refs/heads/master 25c207c93 -> e091bc271
HIVE-12431 : Support timeout for compile lock (Mohit Sabharwal via Szehon)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e091bc27
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e091bc27
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e091bc27
Branch: refs/heads/master
Commit: e091bc27183dc0e24a554e599f1584249650306a
Parents: 25c207c
Author: Szehon Ho <sz...@cloudera.com>
Authored: Tue Dec 15 13:27:22 2015 -0800
Committer: Szehon Ho <sz...@cloudera.com>
Committed: Tue Dec 15 13:27:22 2015 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 5 +-
.../java/org/apache/hadoop/hive/ql/Driver.java | 51 +++++++-
.../org/apache/hadoop/hive/ql/ErrorMsg.java | 1 +
.../apache/hive/service/cli/CLIServiceTest.java | 127 +++++++++++++++++--
4 files changed, 172 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 31f0634..243f281 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1849,7 +1849,10 @@ public class HiveConf extends Configuration {
"Bind host on which to run the HiveServer2 Thrift service."),
HIVE_SERVER2_PARALLEL_COMPILATION("hive.driver.parallel.compilation", false, "Whether to\n" +
"enable parallel compilation between sessions on HiveServer2. The default is false."),
-
+ HIVE_SERVER2_COMPILE_LOCK_TIMEOUT("hive.server2.compile.lock.timeout", "0s",
+ new TimeValidator(TimeUnit.SECONDS),
+ "Number of seconds a request will wait to acquire the compile lock before giving up. " +
+ "Setting it to 0s disables the timeout."),
// HiveServer2 WebUI
HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
HIVE_SERVER2_WEBUI_PORT("hive.server2.webui.port", 10002, "The port the HiveServer2 WebUI will listen on"),
http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index f6af6ca..3d5f3b5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -33,6 +33,7 @@ import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang.StringUtils;
@@ -1128,9 +1129,12 @@ public class Driver implements CommandProcessor {
private int compileInternal(String command) {
boolean isParallelEnabled = SessionState.get().isHiveServerQuery() && this.isParallelEnabled;
int ret;
- final ReentrantLock compileLock = isParallelEnabled
- ? SessionState.get().getCompileLock() : globalCompileLock;
- compileLock.lock();
+ final ReentrantLock compileLock = tryAcquireCompileLock(isParallelEnabled,
+ command);
+ if (compileLock == null) {
+ return ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCode();
+ }
+
try {
if (isParallelEnabled && LOG.isDebugEnabled()) {
LOG.debug("Entering compile: " + command);
@@ -1142,6 +1146,7 @@ public class Driver implements CommandProcessor {
} finally {
compileLock.unlock();
}
+
if (ret != 0) {
try {
releaseLocksAndCommitOrRollback(false, null);
@@ -1153,6 +1158,46 @@ public class Driver implements CommandProcessor {
return ret;
}
+ /**
+ * Acquires the compile lock. If the compile lock wait timeout is configured,
+ * it will acquire the lock if it is not held by another thread within the given
+ * waiting time.
+ * @return the ReentrantLock object if the lock was successfully acquired,
+ * or {@code null} if compile lock wait timeout is configured and
+ * either the waiting time elapsed before the lock could be acquired
+ * or if the current thread is interrupted.
+ */
+ private ReentrantLock tryAcquireCompileLock(boolean isParallelEnabled,
+ String command) {
+ final ReentrantLock compileLock = isParallelEnabled ?
+ SessionState.get().getCompileLock() : globalCompileLock;
+ long maxCompileLockWaitTime = HiveConf.getTimeVar(
+ this.conf, ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT,
+ TimeUnit.SECONDS);
+ if (maxCompileLockWaitTime > 0) {
+ try {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Waiting to acquire compile lock: " + command);
+ }
+ if(!compileLock.tryLock(maxCompileLockWaitTime, TimeUnit.SECONDS)) {
+ errorMessage = ErrorMsg.COMPILE_LOCK_TIMED_OUT.getErrorCodedMsg();
+ LOG.error(errorMessage + ": " + command);
+ return null;
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Interrupted Exception ignored", e);
+ }
+ return null;
+ }
+ } else {
+ compileLock.lock();
+ }
+
+ return compileLock;
+ }
+
private CommandProcessorResponse runInternal(String command, boolean alreadyCompiled)
throws CommandNeedRetryException {
errorMessage = null;
http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
index 9d9dd53..d759739 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/ErrorMsg.java
@@ -426,6 +426,7 @@ public enum ErrorMsg {
TBL_SORTED_NOT_BUCKETED(10306, "Destination table {0} found to be sorted but not bucketed.", true),
//{2} should be lockid
LOCK_ACQUIRE_TIMEDOUT(10307, "Lock acquisition for {0} timed out after {1}ms. {2}", true),
+ COMPILE_LOCK_TIMED_OUT(10308, "Attempt to acquire compile lock timed out.", true),
//========================== 20000 range starts here ========================//
SCRIPT_INIT_ERROR(20000, "Unable to initialize custom script."),
SCRIPT_IO_ERROR(20001, "An error occurred while reading or writing to your custom script. "
http://git-wip-us.apache.org/repos/asf/hive/blob/e091bc27/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
----------------------------------------------------------------------
diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
index 7bfbdb9..e78181a 100644
--- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
+++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
@@ -23,8 +23,10 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
@@ -33,11 +35,16 @@ import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.hive.ql.ErrorMsg;
+import org.apache.hadoop.hive.ql.exec.Task;
+import org.apache.hadoop.hive.ql.parse.ASTNode;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHook;
+import org.apache.hadoop.hive.ql.parse.HiveSemanticAnalyzerHookContext;
+import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hive.service.server.HiveServer2;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -303,15 +310,15 @@ public abstract class CLIServiceTest {
// Create callables with different queries.
String query = "SELECT ID + %1$d FROM " + tableName;
cs[0] = createQueryCallable(
- query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
query = "SELECT t1.ID, SUM(t2.ID) + %1$d FROM " + tableName + " t1 CROSS JOIN "
+ tableName + " t2 GROUP BY t1.ID HAVING t1.ID > 1";
cs[1] = createQueryCallable(
- query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
query = "SELECT b.a FROM (SELECT (t1.ID + %1$d) as a , t2.* FROM " + tableName
+ " t1 INNER JOIN " + tableName + " t2 ON t1.ID = t2.ID WHERE t2.ID > 2) b";
cs[2] = createQueryCallable(
- query, confOverlay, longPollingTimeout, QUERY_COUNT, cdlIn, cdlOut);
+ query, confOverlay, longPollingTimeout, QUERY_COUNT, OperationState.FINISHED, true, cdlIn, cdlOut);
@SuppressWarnings("unchecked")
FutureTask<Void>[] tasks = new FutureTask[THREAD_COUNT];
@@ -334,13 +341,118 @@ public abstract class CLIServiceTest {
client.closeSession(sessionHandle);
}
+ public static class CompileLockTestSleepHook implements HiveSemanticAnalyzerHook {
+ @Override
+ public ASTNode preAnalyze(HiveSemanticAnalyzerHookContext context,
+ ASTNode ast) throws SemanticException {
+ try {
+ Thread.sleep(20 * 1000);
+ } catch (Throwable t) {
+ // do nothing
+ }
+ return ast;
+ }
+
+ @Override
+ public void postAnalyze(HiveSemanticAnalyzerHookContext context,
+ List<Task<? extends Serializable>> rootTasks) throws SemanticException {
+ }
+ }
+
+ @Test
+ public void testGlobalCompileLockTimeout() throws Exception {
+ String tableName = "TEST_COMPILE_LOCK_TIMEOUT";
+ String columnDefinitions = "(ID STRING)";
+
+ // Open a session and set up the test data
+ SessionHandle sessionHandle = setupTestData(tableName, columnDefinitions,
+ new HashMap<String, String>());
+ assertNotNull(sessionHandle);
+
+ int THREAD_COUNT = 3;
+ @SuppressWarnings("unchecked")
+ FutureTask<Void>[] tasks = (FutureTask<Void>[])new FutureTask[THREAD_COUNT];
+ long longPollingTimeoutMs = 10 * 60 * 1000; // Larger than max compile duration used in test
+
+ // 1st query acquires the lock and takes 20 secs to compile
+ Map<String, String> confOverlay = getConfOverlay(0, longPollingTimeoutMs);
+ confOverlay.put(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+ CompileLockTestSleepHook.class.getName());
+ String query = "SELECT 0 FROM " + tableName;
+ tasks[0] = new FutureTask<Void>(
+ createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+ OperationState.FINISHED, false, null, null));
+ new Thread(tasks[0]).start();
+ Thread.sleep(5 * 1000);
+
+ // 2nd query's session has compile lock timeout of 1 sec, so it should
+ // not be able to acquire the lock within that time period
+ confOverlay = getConfOverlay(1, longPollingTimeoutMs);
+ query = "SELECT 1 FROM " + tableName;
+ tasks[1] = new FutureTask<Void>(
+ createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+ OperationState.ERROR, false, null, null));
+ new Thread(tasks[1]).start();
+
+ // 3rd query's session has compile lock timeout of 100 secs, so it should
+ // be able to acquire the lock and finish successfully
+ confOverlay = getConfOverlay(100, longPollingTimeoutMs);
+ query = "SELECT 2 FROM " + tableName;
+ tasks[2] = new FutureTask<Void>(
+ createQueryCallable(query, confOverlay, longPollingTimeoutMs, 1,
+ OperationState.FINISHED, false, null, null));
+ new Thread(tasks[2]).start();
+
+ boolean foundExpectedException = false;
+ for (int i = 0; i < THREAD_COUNT; ++i) {
+ try {
+ tasks[i].get();
+ } catch (Throwable t) {
+ if (i == 1) {
+ assertTrue(t.getMessage().contains(
+ ErrorMsg.COMPILE_LOCK_TIMED_OUT.getMsg()));
+ foundExpectedException = true;
+ } else {
+ throw new RuntimeException(t);
+ }
+ }
+ }
+ assertTrue(foundExpectedException);
+
+ // Cleanup
+ client.executeStatement(sessionHandle, "DROP TABLE " + tableName,
+ getConfOverlay(0, longPollingTimeoutMs));
+ client.closeSession(sessionHandle);
+ }
+
+ private Map<String, String> getConfOverlay(long compileLockTimeoutSecs,
+ long longPollingTimeoutMs) {
+ Map<String, String> confOverlay = new HashMap<String, String>();
+ confOverlay.put(
+ HiveConf.ConfVars.HIVE_SERVER2_PARALLEL_COMPILATION.varname, "false");
+ confOverlay.put(
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname,
+ longPollingTimeoutMs + "ms");
+ if (compileLockTimeoutSecs > 0) {
+ confOverlay.put(
+ HiveConf.ConfVars.HIVE_SERVER2_COMPILE_LOCK_TIMEOUT.varname,
+ compileLockTimeoutSecs + "s");
+ }
+ return confOverlay;
+ }
+
private Callable<Void> createQueryCallable(final String queryStringFormat,
final Map<String, String> confOverlay, final long longPollingTimeout,
- final int queryCount, final CountDownLatch cdlIn, final CountDownLatch cdlOut) {
+ final int queryCount, final OperationState expectedOperationState,
+ final boolean syncThreadStart, final CountDownLatch cdlIn,
+ final CountDownLatch cdlOut) {
return new Callable<Void>() {
@Override
public Void call() throws Exception {
- syncThreadStart(cdlIn, cdlOut);
+ if (syncThreadStart) {
+ syncThreadStart(cdlIn, cdlOut);
+ }
+
SessionHandle sessionHandle = openSession(confOverlay);
OperationHandle[] hs = new OperationHandle[queryCount];
for (int i = 0; i < hs.length; ++i) {
@@ -349,7 +461,7 @@ public abstract class CLIServiceTest {
hs[i] = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
}
for (int i = hs.length - 1; i >= 0; --i) {
- waitForAsyncQuery(hs[i], OperationState.FINISHED, longPollingTimeout);
+ waitForAsyncQuery(hs[i], expectedOperationState, longPollingTimeout);
}
return null;
}
@@ -405,7 +517,6 @@ public abstract class CLIServiceTest {
return waitForAsyncQuery(h, expectedState, longPollingTimeout);
}
-
private OperationStatus waitForAsyncQuery(OperationHandle opHandle,
OperationState expectedState, long longPollingTimeout) throws HiveSQLException {
long testIterationTimeout = System.currentTimeMillis() + 100000;