You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by th...@apache.org on 2013/11/09 08:23:45 UTC
svn commit: r1540267 - in /hive/trunk:
common/src/java/org/apache/hadoop/hive/conf/ conf/
service/src/java/org/apache/hive/service/cli/operation/
service/src/java/org/apache/hive/service/cli/session/
Author: thejas
Date: Sat Nov 9 07:23:45 2013
New Revision: 1540267
URL: http://svn.apache.org/r1540267
Log:
HIVE-5229 : Better thread management for HiveServer2 async threads (Vaibhav Gumashta via Thejas Nair)
Modified:
hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
hive/trunk/conf/hive-default.xml.template
hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
Modified: hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
URL: http://svn.apache.org/viewvc/hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java (original)
+++ hive/trunk/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java Sat Nov 9 07:23:45 2013
@@ -759,9 +759,15 @@ public class HiveConf extends Configurat
// Configuration for async thread pool in SessionManager
// Number of async threads
- HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 50),
+ HIVE_SERVER2_ASYNC_EXEC_THREADS("hive.server2.async.exec.threads", 100),
// Number of seconds HiveServer2 shutdown will wait for async threads to terminate
HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT("hive.server2.async.exec.shutdown.timeout", 10L),
+ // Size of the wait queue for async thread pool in HiveServer2.
+ // After hitting this limit, the async thread pool will reject new requests.
+ HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE("hive.server2.async.exec.wait.queue.size", 100),
+ // Number of seconds that an idle HiveServer2 async thread (from the thread pool)
+ // will wait for a new task to arrive before terminating
+ HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME("hive.server2.async.exec.keepalive.time", 10),
// HiveServer2 auth configuration
Modified: hive/trunk/conf/hive-default.xml.template
URL: http://svn.apache.org/viewvc/hive/trunk/conf/hive-default.xml.template?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/conf/hive-default.xml.template (original)
+++ hive/trunk/conf/hive-default.xml.template Sat Nov 9 07:23:45 2013
@@ -1897,18 +1897,32 @@
<property>
<name>hive.server2.async.exec.threads</name>
- <value>50</value>
+ <value>100</value>
<description>Number of threads in the async thread pool for HiveServer2</description>
</property>
<property>
<name>hive.server2.async.exec.shutdown.timeout</name>
<value>10</value>
- <description>Time (in seconds) for which HiveServer2 shutdown will wait for async
+ <description>Time (in seconds) for which HiveServer2 shutdown will wait for async
threads to terminate</description>
</property>
<property>
+ <name>hive.server2.async.exec.keepalive.time</name>
+ <value>10</value>
+ <description>Time (in seconds) that an idle HiveServer2 async thread (from the thread pool) will wait
+ for a new task to arrive before terminating</description>
+</property>
+
+<property>
+ <name>hive.server2.async.exec.wait.queue.size</name>
+ <value>100</value>
+ <description>Size of the wait queue for async thread pool in HiveServer2.
+ After hitting this limit, the async thread pool will reject new requests.</description>
+</property>
+
+<property>
<name>hive.server2.thrift.port</name>
<value>10000</value>
<description>Port number of HiveServer2 Thrift interface.
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/operation/SQLOperation.java Sat Nov 9 07:23:45 2013
@@ -165,7 +165,8 @@ public class SQLOperation extends Execut
getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
- throw new HiveSQLException(rejected);
+ throw new HiveSQLException("All the asynchronous threads are currently busy, " +
+ "please retry the operation", rejected);
}
}
}
Modified: hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java?rev=1540267&r1=1540266&r2=1540267&view=diff
==============================================================================
--- hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java (original)
+++ hive/trunk/service/src/java/org/apache/hive/service/cli/session/SessionManager.java Sat Nov 9 07:23:45 2013
@@ -21,16 +21,16 @@ package org.apache.hive.service.cli.sess
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.hive.ql.hooks.HookUtils;
import org.apache.hive.service.CompositeService;
import org.apache.hive.service.cli.HiveSQLException;
import org.apache.hive.service.cli.SessionHandle;
@@ -46,7 +46,7 @@ public class SessionManager extends Comp
private final Map<SessionHandle, HiveSession> handleToSession = new HashMap<SessionHandle, HiveSession>();
private OperationManager operationManager = new OperationManager();
private static final Object sessionMapLock = new Object();
- private ExecutorService backgroundOperationPool;
+ private ThreadPoolExecutor backgroundOperationPool;
public SessionManager() {
super("SessionManager");
@@ -57,8 +57,17 @@ public class SessionManager extends Comp
this.hiveConf = hiveConf;
operationManager = new OperationManager();
int backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS);
- LOG.info("HiveServer2: Async execution pool size" + backgroundPoolSize);
- backgroundOperationPool = Executors.newFixedThreadPool(backgroundPoolSize);
+ LOG.info("HiveServer2: Async execution thread pool size: " + backgroundPoolSize);
+ int backgroundPoolQueueSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_WAIT_QUEUE_SIZE);
+ LOG.info("HiveServer2: Async execution wait queue size: " + backgroundPoolQueueSize);
+ int keepAliveTime = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_KEEPALIVE_TIME);
+ LOG.info("HiveServer2: Async execution thread keepalive time: " + keepAliveTime);
+ // Create a thread pool with #backgroundPoolSize threads
+ // Threads terminate when they are idle for more than the keepAliveTime
+ // An bounded blocking queue is used to queue incoming operations, if #operations > backgroundPoolSize
+ backgroundOperationPool = new ThreadPoolExecutor(backgroundPoolSize, backgroundPoolSize,
+ keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(backgroundPoolQueueSize));
+ backgroundOperationPool.allowCoreThreadTimeOut(true);
addService(operationManager);
super.init(hiveConf);
}
@@ -66,26 +75,23 @@ public class SessionManager extends Comp
@Override
public synchronized void start() {
super.start();
- // TODO
}
@Override
public synchronized void stop() {
- // TODO
super.stop();
if (backgroundOperationPool != null) {
backgroundOperationPool.shutdown();
- long timeout = hiveConf.getLongVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
+ int timeout = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT);
try {
backgroundOperationPool.awaitTermination(timeout, TimeUnit.SECONDS);
- } catch (InterruptedException exc) {
+ } catch (InterruptedException e) {
LOG.warn("HIVE_SERVER2_ASYNC_EXEC_SHUTDOWN_TIMEOUT = " + timeout +
- " seconds has been exceeded. RUNNING background operations will be shut down", exc);
+ " seconds has been exceeded. RUNNING background operations will be shut down", e);
}
}
}
-
public SessionHandle openSession(String username, String password, Map<String, String> sessionConf)
throws HiveSQLException {
return openSession(username, password, sessionConf, false, null);
@@ -129,7 +135,6 @@ public class SessionManager extends Comp
session.close();
}
-
public HiveSession getSession(SessionHandle sessionHandle) throws HiveSQLException {
HiveSession session;
synchronized(sessionMapLock) {