You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/22 19:25:22 UTC
accumulo git commit: ACCUMULO-3492 Consolidate duplicate and
unnecessary code in TServerUtils
Repository: accumulo
Updated Branches:
refs/heads/master 80805545e -> a0dbe56c7
ACCUMULO-3492 Consolidate duplicate and unnecessary code in TServerUtils
Uses the self-resizing thread pool for all tservers and removes unused
method arguments
Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a0dbe56c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a0dbe56c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a0dbe56c
Branch: refs/heads/master
Commit: a0dbe56c75a339e396ea80da7d2092945272ad78
Parents: 8080554
Author: Josh Elser <el...@apache.org>
Authored: Thu Jan 22 13:23:51 2015 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jan 22 13:23:51 2015 -0500
----------------------------------------------------------------------
.../accumulo/server/rpc/TServerUtils.java | 101 ++++++++++++-------
1 file changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/accumulo/blob/a0dbe56c/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 6a1adda..cd92e5c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -185,9 +185,37 @@ public class TServerUtils {
options.stopTimeoutVal(5);
// Create our own very special thread pool.
- final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ options.executorService(pool);
+ options.processorFactory(new TProcessorFactory(processor));
+
+ if (address.getPort() == 0) {
+ address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+ }
+
+ return new ServerAddress(new CustomNonBlockingServer(options), address);
+ }
+
+ /**
+ * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the
+ * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof).
+ *
+ * @param serverName
+ * A name to describe the thrift server this executor will service
+ * @param executorThreads
+ * The maximum number of threads for the executor
+ * @param simpleTimerThreads
+ * The numbers of threads used to get the {@link SimpleTimer} instance
+ * @param timeBetweenThreadChecks
+ * The amount of time, in millis, between attempts to resize the executor thread pool
+ * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+ */
+ public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
// periodically adjust the number of threads we need by checking how busy our threads are
- SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+ SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
@Override
public void run() {
// there is a minor race condition between sampling the current state of the thread pool and adjusting it
@@ -199,7 +227,7 @@ public class TServerUtils {
pool.setCorePoolSize(larger);
} else {
if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
- int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
if (smaller != pool.getCorePoolSize()) {
log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
pool.setCorePoolSize(smaller);
@@ -208,15 +236,7 @@ public class TServerUtils {
}
}
}, timeBetweenThreadChecks, timeBetweenThreadChecks);
-
- options.executorService(pool);
- options.processorFactory(new TProcessorFactory(processor));
-
- if (address.getPort() == 0) {
- address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
- }
-
- return new ServerAddress(new CustomNonBlockingServer(options), address);
+ return pool;
}
/**
@@ -230,10 +250,12 @@ public class TServerUtils {
* Maximum size of a Thrift message allowed
* @return A configured TThreadPoolServer and its bound address information
*/
- public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException {
+ public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize, String serverName, int numThreads,
+ int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
TServerSocket transport = new TServerSocket(address.getPort());
- TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize));
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+ TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), pool);
if (address.getPort() == 0) {
address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
@@ -244,20 +266,22 @@ public class TServerUtils {
}
/**
- * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r
+ * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory.
*
* @param transport
- * TServerTransport for the server
+ * Server transport
* @param processor
- * TProcessor for the server
- * @return A configured TThreadPoolServer
+ * Processor implementation
+ * @param transportFactory
+ * Transport factory
+ * @return A configured {@link TThreadPoolServer}
*/
- public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
- return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory());
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+ return createTThreadPoolServer(transport, processor, transportFactory, null);
}
/**
- * Create a TServer with the provided server transport, processor and transport factory.
+ * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory.
*
* @param transport
* TServerTransport for the server
@@ -266,11 +290,15 @@ public class TServerUtils {
* @param transportFactory
* TTransportFactory for the server
*/
- public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+ public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+ ExecutorService service) {
TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
options.protocolFactory(ThriftUtil.protocolFactory());
options.transportFactory(transportFactory);
options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+ if (null != service) {
+ options.executorService(service);
+ }
return new TThreadPoolServer(options);
}
@@ -331,7 +359,8 @@ public class TServerUtils {
* SSL parameters
* @return A ServerAddress with the bound-socket information and the Thrift server
*/
- public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+ public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams,
+ String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks)
throws TTransportException {
TServerSocket transport;
try {
@@ -339,15 +368,18 @@ public class TServerUtils {
} catch (UnknownHostException e) {
throw new TTransportException(e);
}
+
if (address.getPort() == 0) {
address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
}
- return new ServerAddress(createThreadPoolServer(transport, processor), address);
+
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+
+ return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), pool), address);
}
public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params,
- final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
- throws TTransportException {
+ final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException {
// We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
// but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
// when the server does an accept() to (presumably) wake up the eventing system.
@@ -377,9 +409,6 @@ public class TServerUtils {
saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
new SaslRpcServer.SaslGssCallbackHandler());
- // Updates the clientAddress threadlocal so we know who the client's address
- final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor);
-
// Make sure the TTransportFactory is performing a UGI.doAs
TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
@@ -387,8 +416,11 @@ public class TServerUtils {
address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
}
- return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
- .processorFactory(clientInfoFactory).protocolFactory(ThriftUtil.protocolFactory())), address);
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+ final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, pool);
+
+ return new ServerAddress(server, address);
}
public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
@@ -420,16 +452,17 @@ public class TServerUtils {
switch (serverType) {
case SSL:
log.debug("Instantiating SSL Thrift server");
- serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams);
+ serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads,
+ timeBetweenThreadChecks);
break;
case SASL:
log.debug("Instantiating SASL Thrift server");
serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads,
- timeBetweenThreadChecks, maxMessageSize);
+ timeBetweenThreadChecks);
break;
case THREADPOOL:
log.debug("Instantiating unsecure TThreadPool Thrift server");
- serverAddress = createBlockingServer(address, processor, maxMessageSize);
+ serverAddress = createBlockingServer(address, processor, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
break;
case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
default: