You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2013/11/09 08:23:45 UTC

svn commit: r1540267 - in /hive/trunk: common/src/java/org/apache/hadoop/hive/conf/ conf/ service/src/java/org/apache/hive/service/cli/operation/ service/src/java/org/apache/hive/service/cli/session/

Author: thejas
Date: Sat Nov  9 07:23:45 2013
New Revision: 1540267

URL: http://svn.apache.org/r1540267
Log:
HIVE-5229 : Better thread management for HiveServer2 async threads (Vaibhav Gumashta via Thejas Nair)

Modified:
    hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
    hive/trunk/conf/hive-default.xml.template
    hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
    hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java

Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Nov  9 07:23:45 2013
@@ -759,9 +759,15 @@ public class HiveConf extends Configurat
 
     // Configuration for async thread pool in SessionManager
     // Number of async threads
-    HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 50),
+    HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100),
     // Number of seconds HiveServer2 shutdown will wait for async threads to terminate
     HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L),
+    // Size of the wait queue for async thread pool in HiveServer2.
+    // After hitting this limit, the async thread pool will reject new requests.
+    HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE("hive.server2.async.exec.wait.queue.size", 100),
+    // Number of seconds that an idle HiveServer2 async thread (from the thread pool)
+    // will wait for a new task to arrive before terminating
+    HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
 
 
     // HiveServer2 auth configuration

Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Nov  9 07:23:45 2013
@@ -1897,18 +1897,32 @@
 
 <property>
   <name>hive.server2.async.exec.threads</name>
-  <value>50</value>
+  <value>100</value>
   <description>Number of threads in the async thread pool for HiveServer2</description>
 </property>
 
 <property>
   <name>hive.server2.async.exec.shutdown.timeout</name>
   <value>10</value>
-  <description>Time (in seconds) for which HiveServer2 shutdown will wait for async 
+  <description>Time (in seconds) for which HiveServer2 shutdown will wait for async
   threads to terminate</description>
 </property>
 
 <property>
+  <name>hive.server2.async.exec.keepalive.time</name>
+  <value>10</value>
+  <description>Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait
+  for a new task to arrive before terminating</description>
+</property>
+
+<property>
+  <name>hive.server2.async.exec.wait.queue.size</name>
+  <value>100</value>
+  <description>Size of the wait queue for async thread pool in HiveServer2.
+  After hitting this limit, the async thread pool will reject new requests.</description>
+</property>
+
+<property>
   <name>hive.server2.thrift.port</name>
   <value>10000</value>
   <description>Port number of HiveServer2 Thrift interface.

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Nov  9 07:23:45 2013
@@ -165,7 +165,8 @@ public class SQLOperation extends Execut
           getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
       } catch (RejectedExecutionException rejected) {
         setState(OperationState.ERROR);
-        throw new HiveSQLException(rejected);
+        throw new HiveSQLException("All the asynchronous threads are currently busy, " +
+            "please retry the operation", rejected);
       }
     }
   }

Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Sat Nov  9 07:23:45 2013
@@ -21,16 +21,16 @@ package org.apache.hive.service.cli.sess
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
 import org.apache.hive.service.CompositeService;
 import org.apache.hive.service.cli.HiveSQLException;
 import org.apache.hive.service.cli.SessionHandle;
@@ -46,7 +46,7 @@ public class SessionManager extends Comp
   private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>();
   private OperationManager operationManager = new OperationManager();
   private static final Object sessionMapLock = new Object();
-  private ExecutorService backgroundOperationPool;
+  private ThreadPoolExecutor backgroundOperationPool;
 
   public SessionManager() {
     super("SessionManager");
@@ -57,8 +57,17 @@ public class SessionManager extends Comp
     this.hiveConf = hiveConf;
     operationManager = new OperationManager();
     int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
-    LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize);
-    backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize);
+    LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
+    int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+    LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
+    int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
+    LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
+    // Create a thread pool with #backgroundPoolSize threads
+    // Threads terminate when they are idle for more than the keepAliveTime
+    // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+    backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
+        keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
+    backgroundOperationPool.allowCoreThreadTimeOut(true);
     addService(operationManager);
     super.init(hiveConf);
   }
@@ -66,26 +75,23 @@ public class SessionManager extends Comp
   @Override
   public synchronized void start() {
     super.start();
-    // TODO
   }
 
   @Override
   public synchronized void stop() {
-    // TODO
     super.stop();
     if (backgroundOperationPool != null) {
       backgroundOperationPool.shutdown();
-      long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+      int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
       try {
         backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
-      } catch (InterruptedException exc) {
+      } catch (InterruptedException e) {
         LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
-        		" seconds has been exceeded. RUNNING background operations will be shut down", exc);
+            " seconds has been exceeded. RUNNING background operations will be shut down", e);
       }
     }
   }
 
-
   public SessionHandle openSession(String username, String password, Map<String, String> sessionConf)
           throws HiveSQLException {
      return openSession(username, password, sessionConf, false, null);
@@ -129,7 +135,6 @@ public class SessionManager extends Comp
     session.close();
   }
 
-
   public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
     HiveSession session;
     synchronized(sessionMapLock) {