You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/08/27 22:27:21 UTC
svn commit: r1620973 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/
metastore/src/java/org/apache/hadoop/hive/metastore/
service/src/java/org/apache/hive/service/cli/
service/src/java/org/apache/hive/service/cli/operation/ service/src/j...
Author: vgumashta
Date: Wed Aug 27 20:27:21 2014
New Revision: 1620973
URL: http://svn.apache.org/r1620973
Log:
HIVE-7353: HiveServer2 using embedded MetaStore leaks JDOPersistanceManager (Vaibhav Gumashta reviewed by Szehon Ho, Lefty Leverenz, Navis Ryu)
Added:
hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug 27 20:27:21 2014
@@ -54,7 +54,6 @@ import org.apache.hive.common.HiveCompat
* Hive Configuration.
*/
public class HiveConf extends Configuration {
-
protected String hiveJar;
protected Properties origProp;
protected String auxJars;
@@ -1476,8 +1475,11 @@ public class HiveConf extends Configurat
"Minimum number of worker threads when in HTTP mode."),
HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500,
"Maximum number of worker threads when in HTTP mode."),
- HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000,
+ HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000,
"Maximum idle time in milliseconds for a connection on the server when in HTTP mode."),
+ HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60,
+ "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " +
+ "excess threads are killed after this time interval."),
// binary transport settings
HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
@@ -1500,7 +1502,9 @@ public class HiveConf extends Configurat
"Minimum number of Thrift worker threads"),
HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500,
"Maximum number of Thrift worker threads"),
-
+ HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60,
+ "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " +
+ "excess threads are killed after this time interval."),
// Configuration for async thread pool in SessionManager
HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
"Number of threads in the async thread pool for HiveServer2"),
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Wed Aug 27 20:27:21 2014
@@ -250,7 +250,7 @@ public class HiveMetaStore extends Thrif
private static String currentUrl;
private Warehouse wh; // hdfs warehouse
- private final ThreadLocal<RawStore> threadLocalMS =
+ private static final ThreadLocal<RawStore> threadLocalMS =
new ThreadLocal<RawStore>() {
@Override
protected synchronized RawStore initialValue() {
@@ -265,6 +265,14 @@ public class HiveMetaStore extends Thrif
}
};
+ public static RawStore getRawStore() {
+ return threadLocalMS.get();
+ }
+
+ public static void removeRawStore() {
+ threadLocalMS.remove();
+ }
+
// Thread local configuration is needed as many threads could make changes
// to the conf using the connection hook
private final ThreadLocal<Configuration> threadLocalConf =
@@ -384,6 +392,7 @@ public class HiveMetaStore extends Thrif
}
}
+ @Override
public void init() throws MetaException {
rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
initListeners = MetaStoreUtils.getMetaStoreListeners(
Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Aug 27 20:27:21 2014
@@ -252,6 +252,8 @@ public class ObjectStore implements RawS
expressionProxy = createExpressionProxy(hiveConf);
directSql = new MetaStoreDirectSql(pm);
}
+ LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+ " created in the thread with id: " + Thread.currentThread().getId());
}
/**
@@ -343,6 +345,8 @@ public class ObjectStore implements RawS
@Override
public void shutdown() {
if (pm != null) {
+ LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+ " will be shutdown");
pm.close();
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Wed Aug 27 20:27:21 2014
@@ -66,7 +66,6 @@ public class CLIService extends Composit
private HiveConf hiveConf;
private SessionManager sessionManager;
- private IMetaStoreClient metastoreClient;
private UserGroupInformation serviceUGI;
private UserGroupInformation httpUGI;
@@ -128,21 +127,23 @@ public class CLIService extends Composit
} catch (IOException eIO) {
throw new ServiceException("Error setting stage directories", eIO);
}
-
+ // Initialize and test a connection to the metastore
+ IMetaStoreClient metastoreClient = null;
try {
- // Initialize and test a connection to the metastore
metastoreClient = new HiveMetaStoreClient(hiveConf);
metastoreClient.getDatabases("default");
} catch (Exception e) {
throw new ServiceException("Unable to connect to MetaStore!", e);
}
+ finally {
+ if (metastoreClient != null) {
+ metastoreClient.close();
+ }
+ }
}
@Override
public synchronized void stop() {
- if (metastoreClient != null) {
- metastoreClient.close();
- }
super.stop();
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Wed Aug 27 20:27:21 2014
@@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSe
import org.apache.hive.service.cli.RowSetFactory;
import org.apache.hive.service.cli.TableSchema;
import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
/**
* SQLOperation.
@@ -171,27 +172,23 @@ public class SQLOperation extends Execut
if (!shouldRunAsync()) {
runInternal(opConfig);
} else {
+ // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
final SessionState parentSessionState = SessionState.get();
- // current Hive object needs to be set in aysnc thread in case of remote metastore.
- // The metastore client in Hive is associated with right user
- final Hive sessionHive = getCurrentHive();
- // current UGI will get used by metastore when metsatore is in embedded mode
- // so this needs to get passed to the new async thread
+ // ThreadLocal Hive object needs to be set in background thread.
+ // The metastore client in Hive is associated with right user.
+ final Hive parentHive = getSessionHive();
+ // Current UGI will get used by metastore when metsatore is in embedded mode
+ // So this needs to get passed to the new background thread
final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
-
// Runnable impl to call runInternal asynchronously,
// from a different thread
Runnable backgroundOperation = new Runnable() {
-
@Override
public void run() {
PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws HiveSQLException {
-
- // Storing the current Hive object necessary when doAs is enabled
- // User information is part of the metastore client member in Hive
- Hive.set(sessionHive);
+ Hive.set(parentHive);
SessionState.setCurrentSessionState(parentSessionState);
try {
runInternal(opConfig);
@@ -202,12 +199,25 @@ public class SQLOperation extends Execut
return null;
}
};
+
try {
ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction);
} catch (Exception e) {
setOperationException(new HiveSQLException(e));
LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
}
+ finally {
+ /**
+ * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
+ }
}
};
try {
@@ -223,6 +233,12 @@ public class SQLOperation extends Execut
}
}
+ /**
+ * Returns the current UGI on the stack
+ * @param opConfig
+ * @return UserGroupInformation
+ * @throws HiveSQLException
+ */
private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException {
try {
return ShimLoader.getHadoopShims().getUGIForConf(opConfig);
@@ -231,11 +247,16 @@ public class SQLOperation extends Execut
}
}
- private Hive getCurrentHive() throws HiveSQLException {
+ /**
+ * Returns the ThreadLocal Hive for the current thread
+ * @return Hive
+ * @throws HiveSQLException
+ */
+ private Hive getSessionHive() throws HiveSQLException {
try {
return Hive.get();
} catch (HiveException e) {
- throw new HiveSQLException("Failed to get current Hive object", e);
+ throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Wed Aug 27 20:27:21 2014
@@ -62,6 +62,7 @@ import org.apache.hive.service.cli.opera
import org.apache.hive.service.cli.operation.MetadataOperation;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
/**
* HiveSession
@@ -95,14 +96,19 @@ public class HiveSessionImpl implements
this.hiveConf = new HiveConf(serverhiveConf);
this.ipAddress = ipAddress;
- // set an explicit session name to control the download directory name
+ // Set an explicit session name to control the download directory name
hiveConf.set(ConfVars.HIVESESSIONID.varname,
sessionHandle.getHandleIdentifier().toString());
- // use thrift transportable formatter
+ // Use thrift transportable formatter
hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
FetchFormatter.ThriftFormatter.class.getName());
hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
+ /**
+ * Create a new SessionState object that will be associated with this HiveServer2 session.
+ * When the server executes multiple queries in the same session,
+ * this SessionState object is reused across multiple queries.
+ */
sessionState = new SessionState(hiveConf, username);
sessionState.setUserIpAddress(ipAddress);
sessionState.setIsHiveServerQuery(true);
@@ -111,11 +117,9 @@ public class HiveSessionImpl implements
@Override
public void initialize(Map<String, String> sessionConfMap) throws Exception {
- //process global init file: .hiverc
+ // Process global init file: .hiverc
processGlobalInitFile();
- SessionState.setCurrentSessionState(sessionState);
-
- //set conf properties specified by user from client side
+ // Set conf properties specified by user from client side
if (sessionConfMap != null) {
configureSession(sessionConfMap);
}
@@ -169,6 +173,7 @@ public class HiveSessionImpl implements
}
private void configureSession(Map<String, String> sessionConfMap) throws Exception {
+ SessionState.setCurrentSessionState(sessionState);
for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
String key = entry.getKey();
if (key.startsWith("set:")) {
@@ -211,14 +216,26 @@ public class HiveSessionImpl implements
}
protected synchronized void acquire() throws HiveSQLException {
- // need to make sure that the this connections session state is
- // stored in the thread local for sessions.
+ // Need to make sure that the this HiveServer2's session's session state is
+ // stored in the thread local for the handler thread.
SessionState.setCurrentSessionState(sessionState);
}
+ /**
+ * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+ * other requests.
+ * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+ * when this thread is garbage collected later.
+ * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+ */
protected synchronized void release() {
assert sessionState != null;
SessionState.detachSession();
+ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+ ThreadWithGarbageCleanup currentThread =
+ (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+ currentThread.cacheThreadLocalRawStore();
+ }
}
@Override
@@ -468,7 +485,7 @@ public class HiveSessionImpl implements
try {
acquire();
/**
- * For metadata operations like getTables(), getColumns() etc,
+ * For metadata operations like getTables(), getColumns() etc,
* the session allocates a private metastore handler which should be
* closed at the end of the session
*/
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Wed Aug 27 20:27:21 2014
@@ -38,6 +38,7 @@ import org.apache.hive.service.cli.HiveS
import org.apache.hive.service.cli.SessionHandle;
import org.apache.hive.service.cli.operation.OperationManager;
import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
/**
* SessionManager.
@@ -64,22 +65,27 @@ public class SessionManager extends Comp
} catch (HiveException e) {
throw new RuntimeException("Error applying authorization policy on hive configuration", e);
}
-
this.hiveConf = hiveConf;
+ createBackgroundOperationPool();
+ addService(operationManager);
+ super.init(hiveConf);
+ }
+
+ private void createBackgroundOperationPool() {
int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
+ LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
- LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
+ LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
- LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
+ LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
// Create a thread pool with #backgroundPoolSize threads
// Threads terminate when they are idle for more than the keepAliveTime
- // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ String threadPoolName = "HiveServer2-Background-Pool";
backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
- keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
backgroundOperationPool.allowCoreThreadTimeOut(true);
- addService(operationManager);
- super.init(hiveConf);
}
private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Wed Aug 27 20:27:21 2014
@@ -19,12 +19,17 @@
package org.apache.hive.service.cli.thrift;
import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
import org.apache.thrift.server.TThreadPoolServer;
@@ -65,6 +70,11 @@ public class ThriftBinaryCLIService exte
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+ String threadPoolName = "HiveServer2-Handler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
TServerSocket serverSocket = null;
if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
@@ -84,8 +94,7 @@ public class ThriftBinaryCLIService exte
.processorFactory(processorFactory)
.transportFactory(transportFactory)
.protocolFactory(new TBinaryProtocol.Factory())
- .minWorkerThreads(minWorkerThreads)
- .maxWorkerThreads(maxWorkerThreads);
+ .executorService(executorService);
server = new TThreadPoolServer(sargs);
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Wed Aug 27 20:27:21 2014
@@ -70,6 +70,7 @@ public abstract class ThriftCLIService e
protected int minWorkerThreads;
protected int maxWorkerThreads;
+ protected int workerKeepAliveTime;
protected static HiveAuthFactory hiveAuthFactory;
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Wed Aug 27 20:27:21 2014
@@ -18,6 +18,11 @@
package org.apache.hive.service.cli.thrift;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.shims.ShimLoader;
@@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell;
import org.apache.hive.service.auth.HiveAuthFactory;
import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
import org.apache.thrift.TProcessor;
import org.apache.thrift.TProcessorFactory;
import org.apache.thrift.protocol.TBinaryProtocol;
@@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
public class ThriftHttpCLIService extends ThriftCLIService {
@@ -63,13 +69,17 @@ public class ThriftHttpCLIService extend
minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
+ workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
String httpPath = getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
httpServer = new org.eclipse.jetty.server.Server();
- QueuedThreadPool threadPool = new QueuedThreadPool();
- threadPool.setMinThreads(minWorkerThreads);
- threadPool.setMaxThreads(maxWorkerThreads);
+ String threadPoolName = "HiveServer2-HttpHandler-Pool";
+ ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+ workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+ new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+ ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
httpServer.setThreadPool(threadPool);
SelectChannelConnector connector = new SelectChannelConnector();;
Added: hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java?rev=1620973&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java Wed Aug 27 20:27:21 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A ThreadFactory for constructing new HiveServer2 threads that lets you plug
+ * in custom cleanup code to be called before this thread is GC-ed.
+ * Currently cleans up the following:
+ * 1. ThreadLocal RawStore object:
+ * In case of an embedded metastore, HiveServer2 threads (foreground & background)
+ * end up caching a ThreadLocal RawStore object. The ThreadLocal RawStore object has
+ * an instance of PersistenceManagerFactory & PersistenceManager.
+ * The PersistenceManagerFactory keeps a cache of PersistenceManager objects,
+ * which are only removed when PersistenceManager#close method is called.
+ * HiveServer2 uses ExecutorService for managing thread pools for foreground & background threads.
+ * ExecutorService unfortunately does not provide any hooks to be called,
+ * when a thread from the pool is terminated.
+ * As a solution, we're using this ThreadFactory to keep a cache of RawStore objects per thread.
+ * And we are doing clean shutdown in the finalizer for each thread.
+ */
+public class ThreadFactoryWithGarbageCleanup implements ThreadFactory {
+
+ private static Map<Long, RawStore> threadRawStoreMap = new HashMap<Long, RawStore>();
+
+ private final String namePrefix;
+
+ public ThreadFactoryWithGarbageCleanup(String threadPoolName) {
+ namePrefix = threadPoolName;
+ }
+
+ @Override
+ public Thread newThread(Runnable runnable) {
+ Thread newThread = new ThreadWithGarbageCleanup(runnable);
+ newThread.setName(namePrefix + ": Thread-" + newThread.getId());
+ return newThread;
+ }
+
+ public static Map<Long, RawStore> getThreadRawStoreMap() {
+ return threadRawStoreMap;
+ }
+}
Added: hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java?rev=1620973&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java Wed Aug 27 20:27:21 2014
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A HiveServer2 thread used to construct new server threads.
+ * In particular, this thread ensures an orderly cleanup,
+ * when killed by its corresponding ExecutorService.
+ */
+public class ThreadWithGarbageCleanup extends Thread {
+ private static final Log LOG = LogFactory.getLog(ThreadWithGarbageCleanup.class);
+
+ Map<Long, RawStore> threadRawStoreMap =
+ ThreadFactoryWithGarbageCleanup.getThreadRawStoreMap();
+
+ public ThreadWithGarbageCleanup(Runnable runnable) {
+ super(runnable);
+ }
+
+ /**
+ * Add any Thread specific garbage cleanup code here.
+ * Currently, it shuts down the RawStore object for this thread if it is not null.
+ */
+ @Override
+ public void finalize() throws Throwable {
+ cleanRawStore();
+ super.finalize();
+ }
+
+ private void cleanRawStore() {
+ Long threadId = this.getId();
+ RawStore threadLocalRawStore = threadRawStoreMap.get(threadId);
+ if (threadLocalRawStore != null) {
+ LOG.debug("RawStore: " + threadLocalRawStore + ", for the thread: " +
+ this.getName() + " will be closed now.");
+ threadLocalRawStore.shutdown();
+ threadRawStoreMap.remove(threadId);
+ }
+ }
+
+ /**
+ * Cache the ThreadLocal RawStore object. Called from the corresponding thread.
+ */
+ public void cacheThreadLocalRawStore() {
+ Long threadId = this.getId();
+ RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore();
+ if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) {
+ LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " +
+ this.getName() + " to threadRawStoreMap for future cleanup.");
+ threadRawStoreMap.put(threadId, threadLocalRawStore);
+ }
+ }
+}