You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by gu...@apache.org on 2014/09/02 21:57:07 UTC
svn commit: r1622108 [27/27] - in /hive/branches/tez: ./
accumulo-handler/src/java/org/apache/hadoop/hive/accumulo/predicate/
beeline/src/java/org/apache/hive/beeline/
beeline/src/test/org/apache/hive/beeline/ bin/ bin/ext/ checkstyle/
common/src/java/...
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Tue Sep 2 19:56:56 2014
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.common.cli.HiveFileProcessor;
@@ -44,14 +45,7 @@ import org.apache.hadoop.hive.ql.process
import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.operation.ExecuteStatementOperation;
import org.apache.hive.service.cli.operation.GetCatalogsOperation;
import org.apache.hive.service.cli.operation.GetColumnsOperation;
@@ -60,8 +54,10 @@ import org.apache.hive.service.cli.opera
import org.apache.hive.service.cli.operation.GetTableTypesOperation;
import org.apache.hive.service.cli.operation.GetTypeInfoOperation;
import org.apache.hive.service.cli.operation.MetadataOperation;
+import org.apache.hive.service.cli.operation.Operation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
/**
* HiveSession
@@ -86,6 +82,10 @@ public class HiveSessionImpl implements
private OperationManager operationManager;
private IMetaStoreClient metastoreClient = null;
private final Set<OperationHandle> opHandleSet = new HashSet<OperationHandle>();
+ private boolean isOperationLogEnabled;
+ private File sessionLogDir;
+
+ private volatile long lastAccessTime;
public HiveSessionImpl(TProtocolVersion protocol, String username, String password,
HiveConf serverhiveConf, String ipAddress) {
@@ -95,27 +95,32 @@ public class HiveSessionImpl implements
this.hiveConf = new HiveConf(serverhiveConf);
this.ipAddress = ipAddress;
- // set an explicit session name to control the download directory name
+ // Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
- // use thrift transportable formatter
+ // Use thrift transportable formatter
hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
FetchFormatter.ThriftFormatter.class.getName());
hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
+ /**
+ * Create a new SessionState object that will be associated with this HiveServer2 session.
+ * When the server executes multiple queries in the same session,
+ * this SessionState object is reused across multiple queries.
+ */
sessionState = new SessionState(hiveConf, username);
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
+
+ lastAccessTime = System.currentTimeMillis();
SessionState.start(sessionState);
}
@Override
public void initialize(Map<String, String> sessionConfMap) throws Exception {
- //process global init file: .hiverc
+ // Process global init file: .hiverc
processGlobalInitFile();
- SessionState.setCurrentSessionState(sessionState);
-
- //set conf properties specified by user from client side
+ // Set conf properties specified by user from client side
if (sessionConfMap != null) {
configureSession(sessionConfMap);
}
@@ -169,6 +174,7 @@ public class HiveSessionImpl implements
}
private void configureSession(Map<String, String> sessionConfMap) throws Exception {
+ SessionState.setCurrentSessionState(sessionState);
for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
String key = entry.getKey();
if (key.startsWith("set:")) {
@@ -182,6 +188,34 @@ public class HiveSessionImpl implements
}
@Override
+ public void setOperationLogSessionDir(File operationLogRootDir) {
+ sessionLogDir = new File(operationLogRootDir, sessionHandle.getHandleIdentifier().toString());
+ isOperationLogEnabled = true;
+
+ if (!sessionLogDir.exists()) {
+ if (!sessionLogDir.mkdir()) {
+ LOG.warn("Unable to create operation log session directory: " +
+ sessionLogDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+ }
+
+ if (isOperationLogEnabled) {
+ LOG.info("Operation log session directory is created: " + sessionLogDir.getAbsolutePath());
+ }
+ }
+
+ @Override
+ public boolean isOperationLogEnabled() {
+ return isOperationLogEnabled;
+ }
+
+ @Override
+ public File getOperationLogSessionDir() {
+ return sessionLogDir;
+ }
+
+ @Override
public TProtocolVersion getProtocolVersion() {
return sessionHandle.getProtocolVersion();
}
@@ -210,15 +244,32 @@ public class HiveSessionImpl implements
SessionState.start(sessionState);
}
- protected synchronized void acquire() throws HiveSQLException {
- // need to make sure that the this connections session state is
- // stored in the thread local for sessions.
+ protected synchronized void acquire(boolean userAccess) {
+ // Need to make sure that the this HiveServer2's session's session state is
+ // stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
- protected synchronized void release() {
- assert sessionState != null;
+ /**
+ * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+ * other requests.
+ * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
+ protected synchronized void release(boolean userAccess) {
SessionState.detachSession();
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
+ if (userAccess) {
+ lastAccessTime = System.currentTimeMillis();
+ }
}
@Override
@@ -257,7 +308,7 @@ public class HiveSessionImpl implements
@Override
public GetInfoValue getInfo(GetInfoType getInfoType)
throws HiveSQLException {
- acquire();
+ acquire(true);
try {
switch (getInfoType) {
case CLI_SERVER_NAME:
@@ -277,7 +328,7 @@ public class HiveSessionImpl implements
throw new HiveSQLException("Unrecognized GetInfoType value: " + getInfoType.toString());
}
} finally {
- release();
+ release(true);
}
}
@@ -296,7 +347,7 @@ public class HiveSessionImpl implements
private OperationHandle executeStatementInternal(String statement, Map<String, String> confOverlay,
boolean runAsync)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
ExecuteStatementOperation operation = operationManager
@@ -314,14 +365,14 @@ public class HiveSessionImpl implements
}
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTypeInfo()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTypeInfoOperation operation = operationManager.newGetTypeInfoOperation(getSession());
@@ -334,14 +385,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getCatalogs()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetCatalogsOperation operation = operationManager.newGetCatalogsOperation(getSession());
@@ -354,14 +405,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getSchemas(String catalogName, String schemaName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetSchemasOperation operation =
@@ -375,7 +426,7 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@@ -383,7 +434,7 @@ public class HiveSessionImpl implements
public OperationHandle getTables(String catalogName, String schemaName, String tableName,
List<String> tableTypes)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
MetadataOperation operation =
@@ -397,14 +448,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getTableTypes()
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetTableTypesOperation operation = operationManager.newGetTableTypesOperation(getSession());
@@ -417,14 +468,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getColumns(String catalogName, String schemaName,
String tableName, String columnName) throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetColumnsOperation operation = operationManager.newGetColumnsOperation(getSession(),
@@ -438,14 +489,14 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public OperationHandle getFunctions(String catalogName, String schemaName, String functionName)
throws HiveSQLException {
- acquire();
+ acquire(true);
OperationManager operationManager = getOperationManager();
GetFunctionsOperation operation = operationManager
@@ -459,16 +510,16 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
throw e;
} finally {
- release();
+ release(true);
}
}
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
/**
- * For metadata operations like getTables(), getColumns() etc,
+ * For metadata operations like getTables(), getColumns() etc,
* the session allocates a private metastore handler which should be
* closed at the end of the session
*/
@@ -480,6 +531,9 @@ public class HiveSessionImpl implements
operationManager.closeOperation(opHandle);
}
opHandleSet.clear();
+ // Cleanup session log directory.
+ cleanupSessionLogDir();
+
HiveHistory hiveHist = sessionState.getHiveHistory();
if (null != hiveHist) {
hiveHist.closeStream();
@@ -488,7 +542,17 @@ public class HiveSessionImpl implements
} catch (IOException ioe) {
throw new HiveSQLException("Failure to close", ioe);
} finally {
- release();
+ release(true);
+ }
+ }
+
+ private void cleanupSessionLogDir() {
+ if (isOperationLogEnabled) {
+ try {
+ FileUtils.forceDelete(sessionLogDir);
+ } catch (Exception e) {
+ LOG.error("Failed to cleanup session log dir: " + sessionHandle, e);
+ }
}
}
@@ -508,55 +572,79 @@ public class HiveSessionImpl implements
}
@Override
+ public long getLastAccessTime() {
+ return lastAccessTime;
+ }
+
+ @Override
+ public void closeExpiredOperations() {
+ OperationHandle[] handles = opHandleSet.toArray(new OperationHandle[opHandleSet.size()]);
+ if (handles.length > 0) {
+ List<Operation> operations = operationManager.removeExpiredOperations(handles);
+ if (!operations.isEmpty()) {
+ closeTimedOutOperations(operations);
+ }
+ }
+ }
+
+ private void closeTimedOutOperations(List<Operation> operations) {
+ acquire(false);
+ try {
+ for (Operation operation : operations) {
+ opHandleSet.remove(operation.getHandle());
+ try {
+ operation.close();
+ } catch (Exception e) {
+ LOG.warn("Exception is thrown closing timed-out operation " + operation.getHandle(), e);
+ }
+ }
+ } finally {
+ release(false);
+ }
+ }
+
+ @Override
public void cancelOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
sessionManager.getOperationManager().cancelOperation(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public void closeOperation(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
operationManager.closeOperation(opHandle);
opHandleSet.remove(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
public TableSchema getResultSetMetadata(OperationHandle opHandle) throws HiveSQLException {
- acquire();
+ acquire(true);
try {
return sessionManager.getOperationManager().getOperationResultSetSchema(opHandle);
} finally {
- release();
+ release(true);
}
}
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException {
- acquire();
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation,
+ long maxRows, FetchType fetchType) throws HiveSQLException {
+ acquire(true);
try {
- return sessionManager.getOperationManager()
- .getOperationNextRowSet(opHandle, orientation, maxRows);
- } finally {
- release();
- }
- }
-
- @Override
- public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
- acquire();
- try {
- return sessionManager.getOperationManager().getOperationNextRowSet(opHandle);
+ if (fetchType == FetchType.QUERY_OUTPUT) {
+ return operationManager.getOperationNextRowSet(opHandle, orientation, maxRows);
+ }
+ return operationManager.getOperationLogRowSet(opHandle, orientation, maxRows);
} finally {
- release();
+ release(true);
}
}
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/HiveSessionImplwithUGI.java Tue Sep 2 19:56:56 2014
@@ -19,7 +19,6 @@
package org.apache.hive.service.cli.session;
import java.io.IOException;
-import java.util.Map;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.metadata.Hive;
@@ -48,6 +47,14 @@ public class HiveSessionImplwithUGI exte
super(protocol, username, password, hiveConf, ipAddress);
setSessionUGI(username);
setDelegationToken(delegationToken);
+
+ // create a new metastore connection for this particular user session
+ Hive.set(null);
+ try {
+ sessionHive = Hive.get(getHiveConf());
+ } catch (HiveException e) {
+ throw new HiveSQLException("Failed to setup metastore connection", e);
+ }
}
// setup appropriate UGI for the session
@@ -75,8 +82,8 @@ public class HiveSessionImplwithUGI exte
}
@Override
- protected synchronized void acquire() throws HiveSQLException {
- super.acquire();
+ protected synchronized void acquire(boolean userAccess) {
+ super.acquire(userAccess);
// if we have a metastore connection with impersonation, then set it first
if (sessionHive != null) {
Hive.set(sessionHive);
@@ -90,11 +97,11 @@ public class HiveSessionImplwithUGI exte
@Override
public void close() throws HiveSQLException {
try {
- acquire();
+ acquire(true);
ShimLoader.getHadoopShims().closeAllForUGI(sessionUgi);
cancelDelegationToken();
} finally {
- release();
+ release(true);
super.close();
}
}
@@ -115,13 +122,6 @@ public class HiveSessionImplwithUGI exte
} catch (IOException e) {
throw new HiveSQLException("Couldn't setup delegation token in the ugi", e);
}
- // create a new metastore connection using the delegation token
- Hive.set(null);
- try {
- sessionHive = Hive.get(getHiveConf());
- } catch (HiveException e) {
- throw new HiveSQLException("Failed to setup metastore connection", e);
- }
}
}
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Tue Sep 2 19:56:56 2014
@@ -18,6 +18,10 @@
package org.apache.hive.service.cli.session;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -26,6 +30,7 @@ import java.util.concurrent.LinkedBlocki
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -38,6 +43,7 @@ import org.apache.hive.service.cli.HiveS
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
/**
* SessionManager.
@@ -52,6 +58,13 @@ public class SessionManager extends Comp
new ConcurrentHashMap<SessionHandle, HiveSession>();
private final OperationManager operationManager = new OperationManager();
private ThreadPoolExecutor backgroundOperationPool;
+ private boolean isOperationLogEnabled;
+ private File operationLogRootDir;
+
+ private long checkInterval;
+ private long sessionTimeout;
+
+ private volatile boolean shutdown;
public SessionManager() {
super("SessionManager");
@@ -64,24 +77,41 @@ public class SessionManager extends Comp
} catch (HiveException e) {
throw new RuntimeException("Error applying authorization policy on hive configuration", e);
}
-
this.hiveConf = hiveConf;
- int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
- int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
- LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
- int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
- LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
- // Create a thread pool with #backgroundPoolSize threads
- // Threads terminate when they are idle for more than the keepAliveTime
- // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
- backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
- backgroundOperationPool.allowCoreThreadTimeOut(true);
+ //Create operation log root directory, if operation logging is enabled
+ if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
+ initOperationLogRootDir();
+ }
+ createBackgroundOperationPool();
addService(operationManager);
super.init(hiveConf);
}
+ private void createBackgroundOperationPool() {
+ int poolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
+ LOG.info("HiveServer2: Background operation thread pool size: " + poolSize);
+ int poolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+ LOG.info("HiveServer2: Background operation thread wait queue size: " + poolQueueSize);
+ long keepAliveTime = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ LOG.info(
+ "HiveServer2: Background operation thread keepalive time: " + keepAliveTime + " seconds");
+
+ // Create a thread pool with #poolSize threads
+ // Threads terminate when they are idle for more than the keepAliveTime
+ // A bounded blocking queue is used to queue incoming operations, if #operations > poolSize
+ String threadPoolName = "HiveServer2-Background-Pool";
+ backgroundOperationPool = new ThreadPoolExecutor(poolSize, poolSize,
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(poolQueueSize),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+ backgroundOperationPool.allowCoreThreadTimeOut(true);
+
+ checkInterval = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_SESSION_CHECK_INTERVAL, TimeUnit.MILLISECONDS);
+ sessionTimeout = HiveConf.getTimeVar(
+ hiveConf, ConfVars.HIVE_SERVER2_IDLE_SESSION_TIMEOUT, TimeUnit.MILLISECONDS);
+ }
+
private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
// authorization setup using SessionState should be revisited eventually, as
// authorization and authentication are not session specific settings
@@ -91,23 +121,106 @@ public class SessionManager extends Comp
ss.applyAuthorizationPolicy();
}
+ private void initOperationLogRootDir() {
+ operationLogRootDir = new File(
+ hiveConf.getVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION));
+ isOperationLogEnabled = true;
+
+ if (operationLogRootDir.exists() && !operationLogRootDir.isDirectory()) {
+ LOG.warn("The operation log root directory exists, but it is not a directory: " +
+ operationLogRootDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+
+ if (!operationLogRootDir.exists()) {
+ if (!operationLogRootDir.mkdirs()) {
+ LOG.warn("Unable to create operation log root directory: " +
+ operationLogRootDir.getAbsolutePath());
+ isOperationLogEnabled = false;
+ }
+ }
+
+ if (isOperationLogEnabled) {
+ LOG.info("Operation log root directory is created: " + operationLogRootDir.getAbsolutePath());
+ try {
+ FileUtils.forceDeleteOnExit(operationLogRootDir);
+ } catch (IOException e) {
+ LOG.warn("Failed to schedule cleanup HS2 operation logging root dir: " +
+ operationLogRootDir.getAbsolutePath(), e);
+ }
+ }
+ }
+
@Override
public synchronized void start() {
super.start();
+ if (checkInterval > 0) {
+ startTimeoutChecker();
+ }
+ }
+
+ private void startTimeoutChecker() {
+ final long interval = Math.max(checkInterval, 3000l); // minimum 3 seconds
+ Runnable timeoutChecker = new Runnable() {
+ @Override
+ public void run() {
+ for (sleepInterval(interval); !shutdown; sleepInterval(interval)) {
+ long current = System.currentTimeMillis();
+ for (HiveSession session : new ArrayList<HiveSession>(handleToSession.values())) {
+ if (sessionTimeout > 0 && session.getLastAccessTime() + sessionTimeout <= current) {
+ SessionHandle handle = session.getSessionHandle();
+ LOG.warn("Session " + handle + " is Timed-out (last access : " +
+ new Date(session.getLastAccessTime()) + ") and will be closed");
+ try {
+ closeSession(handle);
+ } catch (HiveSQLException e) {
+ LOG.warn("Exception is thrown closing session " + handle, e);
+ }
+ } else {
+ session.closeExpiredOperations();
+ }
+ }
+ }
+ }
+
+ private void sleepInterval(long interval) {
+ try {
+ Thread.sleep(interval);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ };
+ backgroundOperationPool.execute(timeoutChecker);
}
@Override
public synchronized void stop() {
super.stop();
+ shutdown = true;
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
- int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+ long timeout = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT, TimeUnit.SECONDS);
try {
backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
} catch (InterruptedException e) {
LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
" seconds has been exceeded. RUNNING background operations will be shut down", e);
}
+ backgroundOperationPool = null;
+ }
+ cleanupLoggingRootDir();
+ }
+
+ private void cleanupLoggingRootDir() {
+ if (isOperationLogEnabled) {
+ try {
+ FileUtils.forceDelete(operationLogRootDir);
+ } catch (Exception e) {
+ LOG.warn("Failed to cleanup root dir of HS2 logging: " + operationLogRootDir
+ .getAbsolutePath(), e);
+ }
}
}
@@ -132,6 +245,9 @@ public class SessionManager extends Comp
session.setOperationManager(operationManager);
try {
session.initialize(sessionConf);
+ if (isOperationLogEnabled) {
+ session.setOperationLogSessionDir(operationLogRootDir);
+ }
session.open();
} catch (Exception e) {
throw new HiveSQLException("Failed to open new session", e);
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Tue Sep 2 19:56:56 2014
@@ -19,12 +19,17 @@
package org.apache.hive.service.cli.thrift;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
@@ -65,6 +70,12 @@ public class ThriftBinaryCLIService exte
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
+ String threadPoolName = "HiveServer2-Handler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
TServerSocket serverSocket = null;
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
@@ -84,8 +95,7 @@ public class ThriftBinaryCLIService exte
.processorFactory(processorFactory)
.transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
- .minWorkerThreads(minWorkerThreads)
- .maxWorkerThreads(maxWorkerThreads);
+ .executorService(executorService);
server = new TThreadPoolServer(sargs);
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Tue Sep 2 19:56:56 2014
@@ -29,20 +29,10 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
-import org.apache.hadoop.hive.ql.session.SessionState;
import org.apache.hive.service.AbstractService;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.TSetIpAddressProcessor;
-import org.apache.hive.service.cli.CLIService;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.hive.service.cli.session.SessionManager;
import org.apache.thrift.TException;
import org.apache.thrift.server.TServer;
@@ -71,6 +61,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
+ protected long workerKeepAliveTime;
protected static HiveAuthFactory hiveAuthFactory;
@@ -242,7 +233,9 @@ public abstract class ThriftCLIService e
if (userName == null) {
userName = req.getUsername();
}
- return getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ String effectiveClientUser = getProxyUser(userName, req.getConfiguration(), getIpAddress());
+ LOG.debug("Client's username: " + effectiveClientUser);
+ return effectiveClientUser;
}
/**
@@ -532,7 +525,8 @@ public abstract class ThriftCLIService e
RowSet rowSet = cliService.fetchResults(
new OperationHandle(req.getOperationHandle()),
FetchOrientation.getFetchOrientation(req.getOrientation()),
- req.getMaxRows());
+ req.getMaxRows(),
+ FetchType.getFetchType(req.getFetchType()));
resp.setResults(rowSet.toTRowSet());
resp.setHasMoreRows(false);
resp.setStatus(OK_STATUS);
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java Tue Sep 2 19:56:56 2014
@@ -22,18 +22,7 @@ import java.util.List;
import java.util.Map;
import org.apache.hive.service.auth.HiveAuthFactory;
-import org.apache.hive.service.cli.CLIServiceClient;
-import org.apache.hive.service.cli.FetchOrientation;
-import org.apache.hive.service.cli.GetInfoType;
-import org.apache.hive.service.cli.GetInfoValue;
-import org.apache.hive.service.cli.HiveSQLException;
-import org.apache.hive.service.cli.OperationHandle;
-import org.apache.hive.service.cli.OperationState;
-import org.apache.hive.service.cli.OperationStatus;
-import org.apache.hive.service.cli.RowSet;
-import org.apache.hive.service.cli.RowSetFactory;
-import org.apache.hive.service.cli.SessionHandle;
-import org.apache.hive.service.cli.TableSchema;
+import org.apache.hive.service.cli.*;
import org.apache.thrift.TException;
/**
@@ -377,17 +366,15 @@ public class ThriftCLIServiceClient exte
}
}
- /* (non-Javadoc)
- * @see org.apache.hive.service.cli.ICLIService#fetchResults(org.apache.hive.service.cli.OperationHandle, org.apache.hive.service.cli.FetchOrientation, long)
- */
@Override
- public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows)
- throws HiveSQLException {
+ public RowSet fetchResults(OperationHandle opHandle, FetchOrientation orientation, long maxRows,
+ FetchType fetchType) throws HiveSQLException {
try {
TFetchResultsReq req = new TFetchResultsReq();
req.setOperationHandle(opHandle.toTOperationHandle());
req.setOrientation(orientation.toTFetchOrientation());
req.setMaxRows(maxRows);
+ req.setFetchType(fetchType.toTFetchType());
TFetchResultsResp resp = cliService.FetchResults(req);
checkStatus(resp.getStatus());
return RowSetFactory.create(resp.getResults(), opHandle.getProtocolVersion());
@@ -404,7 +391,7 @@ public class ThriftCLIServiceClient exte
@Override
public RowSet fetchResults(OperationHandle opHandle) throws HiveSQLException {
// TODO: set the correct default fetch size
- return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000);
+ return fetchResults(opHandle, FetchOrientation.FETCH_NEXT, 10000, FetchType.QUERY_OUTPUT);
}
@Override
Modified: hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/branches/tez/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Tue Sep 2 19:56:56 2014
@@ -18,6 +18,11 @@
package org.apache.hive.service.cli.thrift;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
public class ThriftHttpCLIService extends ThriftCLIService {
@@ -63,13 +69,18 @@ public class ThriftHttpCLIService extend
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME, TimeUnit.SECONDS);
String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer = new org.eclipse.jetty.server.Server();
- QueuedThreadPool threadPool = new QueuedThreadPool();
- threadPool.setMinThreads(minWorkerThreads);
- threadPool.setMaxThreads(maxWorkerThreads);
+ String threadPoolName = "HiveServer2-HttpHandler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+ ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
httpServer.setThreadPool(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();;
@@ -100,7 +111,8 @@ public class ThriftHttpCLIService extend
// Linux:yes, Windows:no
connector.setReuseAddress(!Shell.WINDOWS);
- int maxIdleTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME);
+ int maxIdleTime = (int) hiveConf.getTimeVar(
+ ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME, TimeUnit.MILLISECONDS);
connector.setMaxIdleTime(maxIdleTime);
httpServer.addConnector(connector);
Modified: hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java?rev=1622108&r1=1622107&r2=1622108&view=diff
==============================================================================
--- hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java (original)
+++ hive/branches/tez/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java Tue Sep 2 19:56:56 2014
@@ -26,9 +26,9 @@ import static org.junit.Assert.fail;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -202,7 +202,8 @@ public abstract class CLIServiceTest {
* to give a compile time error.
* (compilation is done synchronous as of now)
*/
- longPollingTimeout = new HiveConf().getLongVar(ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT);
+ longPollingTimeout = HiveConf.getTimeVar(new HiveConf(),
+ HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT, TimeUnit.MILLISECONDS);
queryString = "SELECT NON_EXISTING_COLUMN FROM " + tableName;
try {
runQueryAsync(sessionHandle, queryString, confOverlay, OperationState.ERROR, longPollingTimeout);
@@ -295,7 +296,7 @@ public abstract class CLIServiceTest {
long longPollingTimeDelta;
OperationStatus opStatus = null;
OperationState state = null;
- confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, String.valueOf(longPollingTimeout));
+ confOverlay.put(HiveConf.ConfVars.HIVE_SERVER2_LONG_POLLING_TIMEOUT.varname, longPollingTimeout + "ms");
OperationHandle opHandle = client.executeStatementAsync(sessionHandle, queryString, confOverlay);
int count = 0;
while (true) {