You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2016/07/14 21:13:51 UTC
hive git commit: HIVE-11402 : HS2 - add an option to disallow
parallel query execution within a single Session (Sergey Shelukhin,
reviewed by Aihua Xu, Thejas Nair)
Repository: hive
Updated Branches:
refs/heads/master 3522f3f4c -> 8f183945a
HIVE-11402 : HS2 - add an option to disallow parallel query execution within a single Session (Sergey Shelukhin, reviewed by Aihua Xu, Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8f183945
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8f183945
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8f183945
Branch: refs/heads/master
Commit: 8f183945ab0e4f9a87eb3812eeab84ec12842751
Parents: 3522f3f
Author: Sergey Shelukhin <se...@apache.org>
Authored: Thu Jul 14 14:13:39 2016 -0700
Committer: Thejas Nair <th...@hortonworks.com>
Committed: Thu Jul 14 14:13:39 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hive/conf/HiveConf.java | 2 +
.../org/apache/hive/jdbc/TestJdbcDriver2.java | 31 ++++
.../cli/session/TestHiveSessionImpl.java | 4 +-
.../operation/ExecuteStatementOperation.java | 2 +-
.../hive/service/cli/operation/Operation.java | 20 ++-
.../service/cli/operation/SQLOperation.java | 145 +++++++++++--------
.../hive/service/cli/session/HiveSession.java | 3 +
.../service/cli/session/HiveSessionImpl.java | 137 ++++++++++++------
.../cli/session/HiveSessionImplwithUGI.java | 3 +-
9 files changed, 228 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index bae1825..42f7d88 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2154,6 +2154,8 @@ public class HiveConf extends Configuration {
new TimeValidator(TimeUnit.SECONDS),
"Number of seconds a request will wait to acquire the compile lock before giving up. " +
"Setting it to 0s disables the timeout."),
+ HIVE_SERVER2_PARALLEL_OPS_IN_SESSION("hive.server2.parallel.ops.in.session", true,
+ "Whether to allow several parallel operations (such as SQL statements) in one session."),
// HiveServer2 WebUI
HIVE_SERVER2_WEBUI_BIND_HOST("hive.server2.webui.host", "0.0.0.0", "The host address the HiveServer2 WebUI will listen on"),
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
index b0fa98f..58f0d43 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcDriver2.java
@@ -135,6 +135,7 @@ public class TestJdbcDriver2 {
System.setProperty(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LEVEL.varname, "verbose");
System.setProperty(ConfVars.HIVEMAPREDMODE.varname, "nonstrict");
System.setProperty(ConfVars.HIVE_AUTHORIZATION_MANAGER.varname, "org.apache.hadoop.hive.ql.security.authorization.DefaultHiveAuthorizationProvider");
+ System.setProperty(ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION.varname, "false");
Statement stmt1 = con1.createStatement();
assertNotNull("Statement is null", stmt1);
@@ -328,6 +329,23 @@ public class TestJdbcDriver2 {
}
@Test
+ public void testSerializedExecution() throws Exception {
+ // Test running parallel queries (with parallel queries disabled).
+ // Should be serialized in the order of execution.
+ HiveStatement stmt1 = (HiveStatement) con.createStatement();
+ HiveStatement stmt2 = (HiveStatement) con.createStatement();
+ stmt1.execute("create temporary function sleepMsUDF as '" + SleepMsUDF.class.getName() + "'");
+ stmt1.execute("create table test_ser_1(i int)");
+ stmt1.executeAsync("insert into test_ser_1 select sleepMsUDF(under_col, 500) from "
+ + tableName + " limit 1");
+ boolean isResultSet = stmt2.executeAsync("select * from test_ser_1");
+ assertTrue(isResultSet);
+ ResultSet rs = stmt2.getResultSet();
+ assertTrue(rs.next());
+ assertFalse(rs.next());
+ }
+
+ @Test
public void testParentReferences() throws Exception {
/* Test parent references from Statement */
Statement s = this.con.createStatement();
@@ -2534,6 +2552,19 @@ public void testParseUrlHttpMode() throws SQLException, JdbcUriParseException,
}
}
+
+ // A udf which sleeps for some number of ms to simulate a long running query
+ public static class SleepMsUDF extends UDF {
+ public Integer evaluate(final Integer value, final Integer ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ // No-op
+ }
+ return value;
+ }
+ }
+
/**
* Loads data from a table containing non-ascii value column
* Runs a query and compares the return value
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
index d58a913..c7fa5da 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/service/cli/session/TestHiveSessionImpl.java
@@ -51,11 +51,11 @@ public class TestHiveSessionImpl {
HiveSessionImpl session = new HiveSessionImpl(protocol, username, password,
serverhiveConf, ipAddress) {
@Override
- protected synchronized void acquire(boolean userAccess) {
+ protected synchronized void acquire(boolean userAccess, boolean isOperation) {
}
@Override
- protected synchronized void release(boolean userAccess) {
+ protected synchronized void release(boolean userAccess, boolean isOperation) {
}
};
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
index ff46ed8..2dd90b6 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/ExecuteStatementOperation.java
@@ -31,7 +31,7 @@ public abstract class ExecuteStatementOperation extends Operation {
public ExecuteStatementOperation(HiveSession parentSession, String statement,
Map<String, String> confOverlay, boolean runInBackground) {
- super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT, runInBackground);
+ super(parentSession, confOverlay, OperationType.EXECUTE_STATEMENT);
this.statement = statement;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/Operation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/Operation.java b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
index 021c1fe..90fe76d 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/Operation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/Operation.java
@@ -67,7 +67,6 @@ public abstract class Operation {
public static final long DEFAULT_FETCH_MAX_ROWS = 100;
protected boolean hasResultSet;
protected volatile HiveSQLException operationException;
- protected final boolean runAsync;
protected volatile Future<?> backgroundHandle;
protected OperationLog operationLog;
protected boolean isOperationLogEnabled;
@@ -85,23 +84,29 @@ public abstract class Operation {
protected static final EnumSet<FetchOrientation> DEFAULT_FETCH_ORIENTATION_SET =
EnumSet.of(FetchOrientation.FETCH_NEXT,FetchOrientation.FETCH_FIRST);
+
protected Operation(HiveSession parentSession, OperationType opType) {
- this(parentSession, null, opType, false);
- }
+ this(parentSession, null, opType);
+ }
+
+ protected Operation(HiveSession parentSession, Map<String, String> confOverlay,
+ OperationType opType) {
+ this(parentSession, confOverlay, opType, false);
+ }
- protected Operation(HiveSession parentSession, Map<String, String> confOverlay, OperationType opType, boolean runInBackground) {
+ protected Operation(HiveSession parentSession,
+ Map<String, String> confOverlay, OperationType opType, boolean isAsyncQueryState) {
this.parentSession = parentSession;
if (confOverlay != null) {
this.confOverlay = confOverlay;
}
- this.runAsync = runInBackground;
this.opHandle = new OperationHandle(opType, parentSession.getProtocolVersion());
beginTime = System.currentTimeMillis();
lastAccessTime = beginTime;
operationTimeout = HiveConf.getTimeVar(parentSession.getHiveConf(),
HiveConf.ConfVars.HIVE_SERVER2_IDLE_OPERATION_TIMEOUT, TimeUnit.MILLISECONDS);
setMetrics(state);
- queryState = new QueryState(parentSession.getHiveConf(), confOverlay, runAsync);
+ queryState = new QueryState(parentSession.getHiveConf(), confOverlay, isAsyncQueryState);
}
public Future<?> getBackgroundHandle() {
@@ -113,10 +118,9 @@ public abstract class Operation {
}
public boolean shouldRunAsync() {
- return runAsync;
+ return false; // Most operations cannot run asynchronously.
}
-
public HiveSession getParentSession() {
return parentSession;
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
index e453354..6f2daf3 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
@@ -89,7 +89,6 @@ import org.codehaus.jackson.map.ObjectMapper;
*/
@SuppressWarnings("deprecation")
public class SQLOperation extends ExecuteStatementOperation {
-
private Driver driver = null;
private CommandProcessorResponse response;
private TableSchema resultSchema = null;
@@ -101,6 +100,7 @@ public class SQLOperation extends ExecuteStatementOperation {
private SQLOperationDisplay sqlOpDisplay;
private long queryTimeout;
private ScheduledExecutorService timeoutExecutor;
+ private final boolean runAsync;
/**
* A map to track query count running by each user
@@ -112,6 +112,7 @@ public class SQLOperation extends ExecuteStatementOperation {
boolean runInBackground, long queryTimeout) {
// TODO: call setRemoteUser in ExecuteStatementOperation or higher.
super(parentSession, statement, confOverlay, runInBackground);
+ this.runAsync = runInBackground;
this.queryTimeout = queryTimeout;
long timeout = HiveConf.getTimeVar(queryState.getConf(),
HiveConf.ConfVars.HIVE_QUERY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
@@ -127,6 +128,11 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
+ @Override
+ public boolean shouldRunAsync() {
+ return runAsync;
+ }
+
private void setupSessionIO(SessionState sessionState) {
try {
sessionState.in = null; // hive server's session input stream is not used
@@ -278,70 +284,16 @@ public class SQLOperation extends ExecuteStatementOperation {
if (!runAsync) {
runQuery();
} else {
- // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
- final SessionState parentSessionState = SessionState.get();
- // ThreadLocal Hive object needs to be set in background thread.
- // The metastore client in Hive is associated with right user.
- final Hive parentHive = parentSession.getSessionHive();
- final PerfLogger parentPerfLogger = SessionState.getPerfLogger();
- // Current UGI will get used by metastore when metsatore is in embedded mode
- // So this needs to get passed to the new background thread
- final UserGroupInformation currentUGI = getCurrentUGI();
- // Runnable impl to call runInternal asynchronously,
- // from a different thread
- Runnable backgroundOperation = new Runnable() {
- @Override
- public void run() {
- PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
- @Override
- public Object run() throws HiveSQLException {
- Hive.set(parentHive);
- // TODO: can this result in cross-thread reuse of session state?
- SessionState.setCurrentSessionState(parentSessionState);
- PerfLogger.setPerfLogger(parentPerfLogger);
- // Set current OperationLog in this async thread for keeping on saving query log.
- registerCurrentOperationLog();
- registerLoggingContext();
- try {
- if (asyncPrepare) {
- prepare(queryState);
- }
- runQuery();
- } catch (HiveSQLException e) {
- setOperationException(e);
- LOG.error("Error running hive query: ", e);
- } finally {
- unregisterLoggingContext();
- unregisterOperationLog();
- }
- return null;
- }
- };
+ // We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
+ // 1) ThreadLocal Hive object needs to be set in background thread
+ // 2) The metastore client in Hive is associated with right user.
+ // 3) Current UGI will get used by metastore when metastore is in embedded mode
+ Runnable work = new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(),
+ SessionState.getPerfLogger(), SessionState.get(), asyncPrepare);
- try {
- currentUGI.doAs(doAsAction);
- } catch (Exception e) {
- setOperationException(new HiveSQLException(e));
- LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
- }
- finally {
- /**
- * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
- * when this thread is garbage collected later.
- * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
- */
- if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
- ThreadWithGarbageCleanup currentThread =
- (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
- currentThread.cacheThreadLocalRawStore();
- }
- }
- }
- };
try {
// This submit blocks if no background threads are available to run this operation
- Future<?> backgroundHandle =
- getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
+ Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);
setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
@@ -351,6 +303,74 @@ public class SQLOperation extends ExecuteStatementOperation {
}
}
+
+ private final class BackgroundWork implements Runnable {
+ private final UserGroupInformation currentUGI;
+ private final Hive parentHive;
+ private final PerfLogger parentPerfLogger;
+ private final SessionState parentSessionState;
+ private final boolean asyncPrepare;
+
+ private BackgroundWork(UserGroupInformation currentUGI,
+ Hive parentHive, PerfLogger parentPerfLogger,
+ SessionState parentSessionState, boolean asyncPrepare) {
+ this.currentUGI = currentUGI;
+ this.parentHive = parentHive;
+ this.parentPerfLogger = parentPerfLogger;
+ this.parentSessionState = parentSessionState;
+ this.asyncPrepare = asyncPrepare;
+ }
+
+ @Override
+ public void run() {
+ PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
+ @Override
+ public Object run() throws HiveSQLException {
+ Hive.set(parentHive);
+ // TODO: can this result in cross-thread reuse of session state?
+ SessionState.setCurrentSessionState(parentSessionState);
+ PerfLogger.setPerfLogger(parentPerfLogger);
+ // Set current OperationLog in this async thread for keeping on saving query log.
+ registerCurrentOperationLog();
+ registerLoggingContext();
+ try {
+ if (asyncPrepare) {
+ prepare(queryState);
+ }
+ runQuery();
+ } catch (HiveSQLException e) {
+ setOperationException(e);
+ LOG.error("Error running hive query: ", e);
+ } finally {
+ unregisterLoggingContext();
+ unregisterOperationLog();
+ }
+ return null;
+ }
+ };
+
+ try {
+ currentUGI.doAs(doAsAction);
+ } catch (Exception e) {
+ setOperationException(new HiveSQLException(e));
+ LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
+ }
+ finally {
+ /**
+ * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
+ }
+ }
+ }
+
+
/**
* Returns the current UGI on the stack
* @param opConfig
@@ -669,4 +689,5 @@ public class SQLOperation extends ExecuteStatementOperation {
public String getExecutionEngine() {
return queryState.getConf().getVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
}
+
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
index 78ff388..e5d865b 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSession.java
@@ -20,6 +20,7 @@ package org.apache.hive.service.cli.session;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Future;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -211,4 +212,6 @@ public interface HiveSession extends HiveSessionBase {
void closeExpiredOperations();
long getNoOperationTime();
+
+ Future<?> submitBackgroundOperation(Runnable work);
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 7341635..eca02d9 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -24,11 +24,13 @@ import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.ArrayList;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Future;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.Semaphore;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
@@ -38,8 +40,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.ql.QueryPlan;
-import org.apache.hadoop.hive.ql.exec.ListSinkOperator;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.history.HiveHistory;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -114,16 +114,19 @@ public class HiveSessionImpl implements HiveSession {
private volatile long lastAccessTime;
private volatile long lastIdleTime;
private volatile int pendingCount = 0;
+ private final Semaphore operationLock;
- public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol, String username, String password,
- HiveConf serverhiveConf, String ipAddress) {
+ public HiveSessionImpl(SessionHandle sessionHandle, TProtocolVersion protocol,
+ String username, String password, HiveConf serverConf, String ipAddress) {
this.username = username;
this.password = password;
creationTime = System.currentTimeMillis();
this.sessionHandle = sessionHandle != null ? sessionHandle : new SessionHandle(protocol);
- this.sessionConf = new HiveConf(serverhiveConf);
+ this.sessionConf = new HiveConf(serverConf);
this.ipAddress = ipAddress;
+ this.operationLock = serverConf.getBoolVar(
+ ConfVars.HIVE_SERVER2_PARALLEL_OPS_IN_SESSION) ? null : new Semaphore(1);
try {
// In non-impersonation mode, map scheduler queue to current user
// if fair scheduler is configured.
@@ -326,7 +329,27 @@ public class HiveSessionImpl implements HiveSession {
this.operationManager = operationManager;
}
- protected synchronized void acquire(boolean userAccess) {
+ protected void acquire(boolean userAccess, boolean isOperation) {
+ if (isOperation && operationLock != null) {
+ try {
+ operationLock.acquire();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ boolean success = false;
+ try {
+ acquireAfterOpLock(userAccess);
+ success = true;
+ } finally {
+ if (!success && isOperation && operationLock != null) {
+ operationLock.release();
+ }
+ }
+ }
+
+ private synchronized void acquireAfterOpLock(boolean userAccess) {
// Need to make sure that the this HiveServer2's session's SessionState is
// stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
@@ -348,7 +371,17 @@ public class HiveSessionImpl implements HiveSession {
* when this thread is garbage collected later.
* @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
*/
- protected synchronized void release(boolean userAccess) {
+ protected void release(boolean userAccess, boolean isOperation) {
+ try {
+ releaseBeforeOpLock(userAccess);
+ } finally {
+ if (isOperation && operationLock != null) {
+ operationLock.release();
+ }
+ }
+ }
+
+ private synchronized void releaseBeforeOpLock(boolean userAccess) {
if (sessionState != null) {
// can be null in-case of junit tests. skip reset.
// reset thread name at release time.
@@ -405,7 +438,7 @@ public class HiveSessionImpl implements HiveSession {
@Override
public GetInfoValue getInfo(GetInfoType getInfoType)
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
try {
switch (getInfoType) {
case CLI_SERVER_NAME:
@@ -425,7 +458,7 @@ public class HiveSessionImpl implements HiveSession {
throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
}
} finally {
- release(true);
+ release(true, true);
}
}
@@ -453,12 +486,11 @@ public class HiveSessionImpl implements HiveSession {
private OperationHandle executeStatementInternal(String statement,
Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
- ExecuteStatementOperation operation =
- operationManager.newExecuteStatementOperation(getSession(), statement, confOverlay,
- runAsync, queryTimeout);
+ ExecuteStatementOperation operation = operationManager.newExecuteStatementOperation(
+ getSession(), statement, confOverlay, runAsync, queryTimeout);
OperationHandle opHandle = operation.getHandle();
try {
operation.run();
@@ -471,14 +503,29 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ if (operation.getBackgroundHandle() == null) {
+ release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.)
+ } else {
+ releaseBeforeOpLock(true); // Release, but keep the lock (if present).
+ }
}
}
@Override
+ public Future<?> submitBackgroundOperation(Runnable work) {
+ return getSessionManager().submitBackgroundOperation(
+ operationLock == null ? work : new FutureTask<Void>(work, null) {
+ protected void done() {
+ // We assume this always comes from a user operation that took the lock.
+ operationLock.release();
+ };
+ });
+ }
+
+ @Override
public OperationHandle getTypeInfo()
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -491,14 +538,14 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public OperationHandle getCatalogs()
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -511,14 +558,14 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetSchemasOperation operation =
@@ -532,7 +579,7 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@@ -540,7 +587,7 @@ public class HiveSessionImpl implements HiveSession {
public OperationHandle getTables(String catalogName, String schemaName, String tableName,
List<String> tableTypes)
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
MetadataOperation operation =
@@ -554,14 +601,14 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public OperationHandle getTableTypes()
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -574,14 +621,14 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
String addedJars = Utilities.getResourceFiles(sessionConf, SessionState.ResourceType.JAR);
if (StringUtils.isNotBlank(addedJars)) {
IMetaStoreClient metastoreClient = getSession().getMetaStoreClient();
@@ -599,7 +646,7 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@@ -612,7 +659,7 @@ public class HiveSessionImpl implements HiveSession {
@Override
public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetFunctionsOperation operation = operationManager
@@ -626,14 +673,14 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public void close() throws HiveSQLException {
try {
- acquire(true);
+ acquire(true, false);
// Iterate through the opHandles and close their operations
List<OperationHandle> ops = null;
synchronized (opHandleSet) {
@@ -675,7 +722,7 @@ public class HiveSessionImpl implements HiveSession {
}
sessionHive = null;
}
- release(true);
+ release(true, false);
}
}
@@ -736,7 +783,7 @@ public class HiveSessionImpl implements HiveSession {
}
private void closeTimedOutOperations(List<Operation> operations) {
- acquire(false);
+ acquire(false, false);
try {
for (Operation operation : operations) {
synchronized (opHandleSet) {
@@ -749,54 +796,54 @@ public class HiveSessionImpl implements HiveSession {
}
}
} finally {
- release(false);
+ release(false, false);
}
}
@Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire(true);
+ acquire(true, false);
try {
sessionManager.getOperationManager().cancelOperation(opHandle);
} finally {
- release(true);
+ release(true, false);
}
}
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire(true);
+ acquire(true, false);
try {
operationManager.closeOperation(opHandle);
synchronized (opHandleSet) {
opHandleSet.remove(opHandle);
}
} finally {
- release(true);
+ release(true, false);
}
}
@Override
public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
try {
return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
} finally {
- release(true);
+ release(true, true);
}
}
@Override
public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
long maxRows, FetchType fetchType) throws HiveSQLException {
- acquire(true);
+ acquire(true, false);
try {
if (fetchType == FetchType.QUERY_OUTPUT) {
return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
}
return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows, sessionConf);
} finally {
- release(true);
+ release(true, false);
}
}
@@ -850,7 +897,7 @@ public class HiveSessionImpl implements HiveSession {
@Override
public OperationHandle getPrimaryKeys(String catalog, String schema,
String table) throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetPrimaryKeysOperation operation = operationManager
@@ -864,7 +911,7 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
@@ -872,7 +919,7 @@ public class HiveSessionImpl implements HiveSession {
public OperationHandle getCrossReference(String primaryCatalog,
String primarySchema, String primaryTable, String foreignCatalog,
String foreignSchema, String foreignTable) throws HiveSQLException {
- acquire(true);
+ acquire(true, true);
OperationManager operationManager = getOperationManager();
GetCrossReferenceOperation operation = operationManager
@@ -888,7 +935,7 @@ public class HiveSessionImpl implements HiveSession {
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release(true);
+ release(true, true);
}
}
}
http://git-wip-us.apache.org/repos/asf/hive/blob/8f183945/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
index f7b3412..afed9e2 100644
--- a/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
+++ b/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
@@ -89,9 +89,10 @@ public class HiveSessionImplwithUGI extends HiveSessionImpl {
@Override
public void close() throws HiveSQLException {
try {
- acquire(true);
+ acquire(true, false);
cancelDelegationToken();
} finally {
+ release(true, false);
try {
super.close();
} finally {