You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by vg...@apache.org on 2014/08/27 22:27:21 UTC

svn commit: r1620973 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ metastore/src/java/org/apache/hadoop/hive/metastore/ service/src/java/org/apache/hive/service/cli/ service/src/java/org/apache/hive/service/cli/operation/ service/src/j...

Author: vgumashta
Date: Wed Aug 27 20:27:21 2014
New Revision: 1620973

URL: http://svn.apache.org/r1620973
Log:
HIVE-7353: HiveServer2 using embedded MetaStore leaks JDOPersistanceManager (Vaibhav Gumashta reviewed by Szehon Ho, Lefty Leverenz, Navis Ryu)

Added:
    hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
    hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Wed Aug 27 20:27:21 2014
@@ -54,7 +54,6 @@ import org.apache.hive.common.HiveCompat
  * Hive Configuration.
  */
 public class HiveConf extends Configuration {
-
   protected String hiveJar;
   protected Properties origProp;
   protected String auxJars;
@@ -1476,8 +1475,11 @@ public class HiveConf extends Configurat
         "Minimum number of worker threads when in HTTP mode."),
     HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS("hive.server2.thrift.http.max.worker.threads", 500,
         "Maximum number of worker threads when in HTTP mode."),
-    HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000, 
+    HIVE_SERVER2_THRIFT_HTTP_MAX_IDLE_TIME("hive.server2.thrift.http.max.idle.time", 1800000,
         "Maximum idle time in milliseconds for a connection on the server when in HTTP mode."),
+    HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME("hive.server2.thrift.http.worker.keepalive.time", 60,
+        "Keepalive time (in seconds) for an idle http worker thread. When number of workers > min workers, " +
+        "excess threads are killed after this time interval."),
 
     // binary transport settings
     HIVE_SERVER2_THRIFT_PORT("hive.server2.thrift.port", 10000,
@@ -1500,7 +1502,9 @@ public class HiveConf extends Configurat
         "Minimum number of Thrift worker threads"),
     HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS("hive.server2.thrift.max.worker.threads", 500,
         "Maximum number of Thrift worker threads"),
-
+    HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME("hive.server2.thrift.worker.keepalive.time", 60,
+        "Keepalive time (in seconds) for an idle worker thread. When number of workers > min workers, " +
+        "excess threads are killed after this time interval."),
     // Configuration for async thread pool in SessionManager
     HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100,
         "Number of threads in the async thread pool for HiveServer2"),

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java Wed Aug 27 20:27:21 2014
@@ -250,7 +250,7 @@ public class HiveMetaStore extends Thrif
     private static String currentUrl;
 
     private Warehouse wh; // hdfs warehouse
-    private final ThreadLocal<RawStore> threadLocalMS =
+    private static final ThreadLocal<RawStore> threadLocalMS =
         new ThreadLocal<RawStore>() {
           @Override
           protected synchronized RawStore initialValue() {
@@ -265,6 +265,14 @@ public class HiveMetaStore extends Thrif
       }
     };
 
+    public static RawStore getRawStore() {
+      return threadLocalMS.get();
+    }
+
+    public static void removeRawStore() {
+      threadLocalMS.remove();
+    }
+
     // Thread local configuration is needed as many threads could make changes
     // to the conf using the connection hook
     private final ThreadLocal<Configuration> threadLocalConf =
@@ -384,6 +392,7 @@ public class HiveMetaStore extends Thrif
       }
     }
 
+    @Override
     public void init() throws MetaException {
       rawStoreClassName = hiveConf.getVar(HiveConf.ConfVars.METASTORE_RAW_STORE_IMPL);
       initListeners = MetaStoreUtils.getMetaStoreListeners(

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/ObjectStore.java Wed Aug 27 20:27:21 2014
@@ -252,6 +252,8 @@ public class ObjectStore implements RawS
       expressionProxy = createExpressionProxy(hiveConf);
       directSql = new MetaStoreDirectSql(pm);
     }
+    LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+        " created in the thread with id: " + Thread.currentThread().getId());
   }
 
   /**
@@ -343,6 +345,8 @@ public class ObjectStore implements RawS
   @Override
   public void shutdown() {
     if (pm != null) {
+      LOG.debug("RawStore: " + this + ", with PersistenceManager: " + pm +
+          " will be shutdown");
       pm.close();
     }
   }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/CLIService.java Wed Aug 27 20:27:21 2014
@@ -66,7 +66,6 @@ public class CLIService extends Composit
 
   private HiveConf hiveConf;
   private SessionManager sessionManager;
-  private IMetaStoreClient metastoreClient;
   private UserGroupInformation serviceUGI;
   private UserGroupInformation httpUGI;
 
@@ -128,21 +127,23 @@ public class CLIService extends Composit
     } catch (IOException eIO) {
       throw new ServiceException("Error setting stage directories", eIO);
     }
-
+    // Initialize and test a connection to the metastore
+    IMetaStoreClient metastoreClient = null;
     try {
-      // Initialize and test a connection to the metastore
       metastoreClient = new HiveMetaStoreClient(hiveConf);
       metastoreClient.getDatabases("default");
     } catch (Exception e) {
       throw new ServiceException("Unable to connect to MetaStore!", e);
     }
+    finally {
+      if (metastoreClient != null) {
+        metastoreClient.close();
+      }
+    }
   }
 
   @Override
   public synchronized void stop() {
-    if (metastoreClient != null) {
-      metastoreClient.close();
-    }
     super.stop();
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Wed Aug 27 20:27:21 2014
@@ -60,6 +60,7 @@ import org.apache.hive.service.cli.RowSe
 import org.apache.hive.service.cli.RowSetFactory;
 import org.apache.hive.service.cli.TableSchema;
 import org.apache.hive.service.cli.session.HiveSession;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 
 /**
  * SQLOperation.
@@ -171,27 +172,23 @@ public class SQLOperation extends Execut
     if (!shouldRunAsync()) {
       runInternal(opConfig);
     } else {
+      // We'll pass ThreadLocals in the background thread from the foreground (handler) thread
       final SessionState parentSessionState = SessionState.get();
-      // current Hive object needs to be set in aysnc thread in case of remote metastore.
-      // The metastore client in Hive is associated with right user
-      final Hive sessionHive = getCurrentHive();
-      // current UGI will get used by metastore when metsatore is in embedded mode
-      // so this needs to get passed to the new async thread
+      // ThreadLocal Hive object needs to be set in background thread.
+      // The metastore client in Hive is associated with right user.
+      final Hive parentHive = getSessionHive();
+      // Current UGI will get used by metastore when metsatore is in embedded mode
+      // So this needs to get passed to the new background thread
       final UserGroupInformation currentUGI = getCurrentUGI(opConfig);
-
       // Runnable impl to call runInternal asynchronously,
       // from a different thread
       Runnable backgroundOperation = new Runnable() {
-
         @Override
         public void run() {
           PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() {
             @Override
             public Object run() throws HiveSQLException {
-
-              // Storing the current Hive object necessary when doAs is enabled
-              // User information is part of the metastore client member in Hive
-              Hive.set(sessionHive);
+              Hive.set(parentHive);
               SessionState.setCurrentSessionState(parentSessionState);
               try {
                 runInternal(opConfig);
@@ -202,12 +199,25 @@ public class SQLOperation extends Execut
               return null;
             }
           };
+
           try {
             ShimLoader.getHadoopShims().doAs(currentUGI, doAsAction);
           } catch (Exception e) {
             setOperationException(new HiveSQLException(e));
             LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e);
           }
+          finally {
+            /**
+             * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+             * when this thread is garbage collected later.
+             * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+             */
+            if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+              ThreadWithGarbageCleanup currentThread =
+                  (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+              currentThread.cacheThreadLocalRawStore();
+            }
+          }
         }
       };
       try {
@@ -223,6 +233,12 @@ public class SQLOperation extends Execut
     }
   }
 
+  /**
+   * Returns the current UGI on the stack
+   * @param opConfig
+   * @return UserGroupInformation
+   * @throws HiveSQLException
+   */
   private UserGroupInformation getCurrentUGI(HiveConf opConfig) throws HiveSQLException {
     try {
       return ShimLoader.getHadoopShims().getUGIForConf(opConfig);
@@ -231,11 +247,16 @@ public class SQLOperation extends Execut
     }
   }
 
-  private Hive getCurrentHive() throws HiveSQLException {
+  /**
+   * Returns the ThreadLocal Hive for the current thread
+   * @return Hive
+   * @throws HiveSQLException
+   */
+  private Hive getSessionHive() throws HiveSQLException {
     try {
       return Hive.get();
     } catch (HiveException e) {
-      throw new HiveSQLException("Failed to get current Hive object", e);
+      throw new HiveSQLException("Failed to get ThreadLocal Hive object", e);
     }
   }
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/HiveSessionImpl.java Wed Aug 27 20:27:21 2014
@@ -62,6 +62,7 @@ import org.apache.hive.service.cli.opera
 import org.apache.hive.service.cli.operation.MetadataOperation;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadWithGarbageCleanup;
 
 /**
  * HiveSession
@@ -95,14 +96,19 @@ public class HiveSessionImpl implements 
     this.hiveConf = new HiveConf(serverhiveConf);
     this.ipAddress = ipAddress;
 
-    // set an explicit session name to control the download directory name
+    // Set an explicit session name to control the download directory name
     hiveConf.set(ConfVars.HIVESESSIONID.varname,
         sessionHandle.getHandleIdentifier().toString());
-    // use thrift transportable formatter
+    // Use thrift transportable formatter
     hiveConf.set(ListSinkOperator.OUTPUT_FORMATTER,
         FetchFormatter.ThriftFormatter.class.getName());
     hiveConf.setInt(ListSinkOperator.OUTPUT_PROTOCOL, protocol.getValue());
 
+    /**
+     * Create a new SessionState object that will be associated with this HiveServer2 session.
+     * When the server executes multiple queries in the same session,
+     * this SessionState object is reused across multiple queries.
+     */
     sessionState = new SessionState(hiveConf, username);
     sessionState.setUserIpAddress(ipAddress);
     sessionState.setIsHiveServerQuery(true);
@@ -111,11 +117,9 @@ public class HiveSessionImpl implements 
 
   @Override
   public void initialize(Map<String, String> sessionConfMap) throws Exception {
-    //process global init file: .hiverc
+    // Process global init file: .hiverc
     processGlobalInitFile();
-    SessionState.setCurrentSessionState(sessionState);
-
-    //set conf properties specified by user from client side
+    // Set conf properties specified by user from client side
     if (sessionConfMap != null) {
       configureSession(sessionConfMap);
     }
@@ -169,6 +173,7 @@ public class HiveSessionImpl implements 
   }
 
   private void configureSession(Map<String, String> sessionConfMap) throws Exception {
+    SessionState.setCurrentSessionState(sessionState);
     for (Map.Entry<String, String> entry : sessionConfMap.entrySet()) {
       String key = entry.getKey();
       if (key.startsWith("set:")) {
@@ -211,14 +216,26 @@ public class HiveSessionImpl implements 
   }
 
   protected synchronized void acquire() throws HiveSQLException {
-    // need to make sure that the this connections session state is
-    // stored in the thread local for sessions.
+    // Need to make sure that the this HiveServer2's session's session state is
+    // stored in the thread local for the handler thread.
     SessionState.setCurrentSessionState(sessionState);
   }
 
+  /**
+   * 1. We'll remove the ThreadLocal SessionState as this thread might now serve
+   * other requests.
+   * 2. We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup
+   * when this thread is garbage collected later.
+   * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize()
+   */
   protected synchronized void release() {
     assert sessionState != null;
     SessionState.detachSession();
+    if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) {
+      ThreadWithGarbageCleanup currentThread =
+          (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread();
+      currentThread.cacheThreadLocalRawStore();
+    }
   }
 
   @Override
@@ -468,7 +485,7 @@ public class HiveSessionImpl implements 
     try {
       acquire();
       /**
-       *  For metadata operations like getTables(), getColumns() etc,
+       * For metadata operations like getTables(), getColumns() etc,
        * the session allocates a private metastore handler which should be
        * closed at the end of the session
        */

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Wed Aug 27 20:27:21 2014
@@ -38,6 +38,7 @@ import org.apache.hive.service.cli.HiveS
 import org.apache.hive.service.cli.SessionHandle;
 import org.apache.hive.service.cli.operation.OperationManager;
 import org.apache.hive.service.cli.thrift.TProtocolVersion;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 
 /**
  * SessionManager.
@@ -64,22 +65,27 @@ public class SessionManager extends Comp
     } catch (HiveException e) {
       throw new RuntimeException("Error applying authorization policy on hive configuration", e);
     }
-
     this.hiveConf = hiveConf;
+    createBackgroundOperationPool();
+    addService(operationManager);
+    super.init(hiveConf);
+  }
+
+  private void createBackgroundOperationPool() {
     int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
-    LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
+    LOG.info("HiveServer2: Background operation thread pool size: " + backgroundPoolSize);
     int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
-    LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
+    LOG.info("HiveServer2: Background operation thread wait queue size: " + backgroundPoolQueueSize);
     int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
-    LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
+    LOG.info("HiveServer2: Background operation thread keepalive time: " + keepAliveTime);
     // Create a thread pool with #backgroundPoolSize threads
     // Threads terminate when they are idle for more than the keepAliveTime
-    // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+    // A bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+    String threadPoolName = "HiveServer2-Background-Pool";
     backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
-        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize),
+        new ThreadFactoryWithGarbageCleanup(threadPoolName));
     backgroundOperationPool.allowCoreThreadTimeOut(true);
-    addService(operationManager);
-    super.init(hiveConf);
   }
 
   private void applyAuthorizationConfigPolicy(HiveConf newHiveConf) throws HiveException {

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java Wed Aug 27 20:27:21 2014
@@ -19,12 +19,17 @@
 package org.apache.hive.service.cli.thrift;
 
 import java.net.InetSocketAddress;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
 import org.apache.thrift.server.TThreadPoolServer;
@@ -65,6 +70,11 @@ public class ThriftBinaryCLIService exte
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_MAX_WORKER_THREADS);
+      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_WORKER_KEEPALIVE_TIME);
+      String threadPoolName = "HiveServer2-Handler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
 
       TServerSocket serverSocket = null;
       if (!hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_USE_SSL)) {
@@ -84,8 +94,7 @@ public class ThriftBinaryCLIService exte
       .processorFactory(processorFactory)
       .transportFactory(transportFactory)
       .protocolFactory(new TBinaryProtocol.Factory())
-      .minWorkerThreads(minWorkerThreads)
-      .maxWorkerThreads(maxWorkerThreads);
+      .executorService(executorService);
 
       server = new TThreadPoolServer(sargs);
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java Wed Aug 27 20:27:21 2014
@@ -70,6 +70,7 @@ public abstract class ThriftCLIService e
 
   protected int minWorkerThreads;
   protected int maxWorkerThreads;
+  protected int workerKeepAliveTime;
 
   protected static HiveAuthFactory hiveAuthFactory;
 

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java?rev=1620973&r1=1620972&r2=1620973&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/thrift/ThriftHttpCLIService.java Wed Aug 27 20:27:21 2014
@@ -18,6 +18,11 @@
 
 package org.apache.hive.service.cli.thrift;
 
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -26,6 +31,7 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hive.service.auth.HiveAuthFactory;
 import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes;
 import org.apache.hive.service.cli.CLIService;
+import org.apache.hive.service.server.ThreadFactoryWithGarbageCleanup;
 import org.apache.thrift.TProcessor;
 import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TBinaryProtocol;
@@ -36,7 +42,7 @@ import org.eclipse.jetty.server.ssl.SslS
 import org.eclipse.jetty.servlet.ServletContextHandler;
 import org.eclipse.jetty.servlet.ServletHolder;
 import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
+import org.eclipse.jetty.util.thread.ExecutorThreadPool;
 
 
 public class ThriftHttpCLIService extends ThriftCLIService {
@@ -63,13 +69,17 @@ public class ThriftHttpCLIService extend
 
       minWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MIN_WORKER_THREADS);
       maxWorkerThreads = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_MAX_WORKER_THREADS);
+      workerKeepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_THRIFT_HTTP_WORKER_KEEPALIVE_TIME);
 
       String httpPath =  getHttpPath(hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_THRIFT_HTTP_PATH));
 
       httpServer = new org.eclipse.jetty.server.Server();
-      QueuedThreadPool threadPool = new QueuedThreadPool();
-      threadPool.setMinThreads(minWorkerThreads);
-      threadPool.setMaxThreads(maxWorkerThreads);
+      String threadPoolName = "HiveServer2-HttpHandler-Pool";
+      ExecutorService executorService = new ThreadPoolExecutor(minWorkerThreads, maxWorkerThreads,
+          workerKeepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(),
+          new ThreadFactoryWithGarbageCleanup(threadPoolName));
+
+      ExecutorThreadPool threadPool = new ExecutorThreadPool(executorService);
       httpServer.setThreadPool(threadPool);
 
       SelectChannelConnector connector = new SelectChannelConnector();;

Added: hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java?rev=1620973&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/ThreadFactoryWithGarbageCleanup.java Wed Aug 27 20:27:21 2014
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ThreadFactory;
+
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A ThreadFactory for constructing new HiveServer2 threads that lets you plug
+ * in custom cleanup code to be called before this thread is GC-ed.
+ * Currently cleans up the following:
+ * 1. ThreadLocal RawStore object:
+ * In case of an embedded metastore, HiveServer2 threads (foreground & background)
+ * end up caching a ThreadLocal RawStore object. The ThreadLocal RawStore object has
+ * an instance of PersistenceManagerFactory & PersistenceManager.
+ * The PersistenceManagerFactory keeps a cache of PersistenceManager objects,
+ * which are only removed when PersistenceManager#close method is called.
+ * HiveServer2 uses ExecutorService for managing thread pools for foreground & background threads.
+ * ExecutorService unfortunately does not provide any hooks to be called,
+ * when a thread from the pool is terminated.
+ * As a solution, we're using this ThreadFactory to keep a cache of RawStore objects per thread.
+ * And we are doing clean shutdown in the finalizer for each thread.
+ */
+public class ThreadFactoryWithGarbageCleanup implements ThreadFactory {
+
+  private static Map<Long, RawStore> threadRawStoreMap = new HashMap<Long, RawStore>();
+
+  private final String namePrefix;
+
+  public ThreadFactoryWithGarbageCleanup(String threadPoolName) {
+    namePrefix = threadPoolName;
+  }
+
+  @Override
+  public Thread newThread(Runnable runnable) {
+    Thread newThread = new ThreadWithGarbageCleanup(runnable);
+    newThread.setName(namePrefix + ": Thread-" + newThread.getId());
+    return newThread;
+  }
+
+  public static Map<Long, RawStore> getThreadRawStoreMap() {
+    return threadRawStoreMap;
+  }
+}

Added: hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java?rev=1620973&view=auto
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java (added)
+++ hive/trunk/service/src/java/org/apache/hive/service/server/ThreadWithGarbageCleanup.java Wed Aug 27 20:27:21 2014
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.hive.service.server;
+
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hive.metastore.HiveMetaStore;
+import org.apache.hadoop.hive.metastore.RawStore;
+
+/**
+ * A HiveServer2 thread used to construct new server threads.
+ * In particular, this thread ensures an orderly cleanup,
+ * when killed by its corresponding ExecutorService.
+ */
+public class ThreadWithGarbageCleanup extends Thread {
+  private static final Log LOG = LogFactory.getLog(ThreadWithGarbageCleanup.class);
+
+  Map<Long, RawStore> threadRawStoreMap =
+      ThreadFactoryWithGarbageCleanup.getThreadRawStoreMap();
+
+  public ThreadWithGarbageCleanup(Runnable runnable) {
+    super(runnable);
+  }
+
+  /**
+   * Add any Thread specific garbage cleanup code here.
+   * Currently, it shuts down the RawStore object for this thread if it is not null.
+   */
+  @Override
+  public void finalize() throws Throwable {
+    cleanRawStore();
+    super.finalize();
+  }
+
+  private void cleanRawStore() {
+    Long threadId = this.getId();
+    RawStore threadLocalRawStore = threadRawStoreMap.get(threadId);
+    if (threadLocalRawStore != null) {
+      LOG.debug("RawStore: " + threadLocalRawStore + ", for the thread: " +
+          this.getName()  +  " will be closed now.");
+      threadLocalRawStore.shutdown();
+      threadRawStoreMap.remove(threadId);
+    }
+  }
+
+  /**
+   * Cache the ThreadLocal RawStore object. Called from the corresponding thread.
+   */
+  public void cacheThreadLocalRawStore() {
+    Long threadId = this.getId();
+    RawStore threadLocalRawStore = HiveMetaStore.HMSHandler.getRawStore();
+    if (threadLocalRawStore != null && !threadRawStoreMap.containsKey(threadId)) {
+      LOG.debug("Adding RawStore: " + threadLocalRawStore + ", for the thread: " +
+          this.getName() + " to threadRawStoreMap for future cleanup.");
+      threadRawStoreMap.put(threadId, threadLocalRawStore);
+    }
+  }
+}