You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by el...@apache.org on 2015/01/22 19:25:22 UTC

accumulo git commit: ACCUMULO-3492 Consolidate duplicate and unnecessary code in TServerUtils

Repository: accumulo
Updated Branches:
  refs/heads/master 80805545e -> a0dbe56c7


ACCUMULO-3492 Consolidate duplicate and unnecessary code in TServerUtils

Uses the self-resizing thread pool for all tservers and removes unused
method arguments


Project: http://git-wip-us.apache.org/repos/asf/accumulo/repo
Commit: http://git-wip-us.apache.org/repos/asf/accumulo/commit/a0dbe56c
Tree: http://git-wip-us.apache.org/repos/asf/accumulo/tree/a0dbe56c
Diff: http://git-wip-us.apache.org/repos/asf/accumulo/diff/a0dbe56c

Branch: refs/heads/master
Commit: a0dbe56c75a339e396ea80da7d2092945272ad78
Parents: 8080554
Author: Josh Elser <el...@apache.org>
Authored: Thu Jan 22 13:23:51 2015 -0500
Committer: Josh Elser <el...@apache.org>
Committed: Thu Jan 22 13:23:51 2015 -0500

----------------------------------------------------------------------
 .../accumulo/server/rpc/TServerUtils.java       | 101 ++++++++++++-------
 1 file changed, 67 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/accumulo/blob/a0dbe56c/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
----------------------------------------------------------------------
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 6a1adda..cd92e5c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -185,9 +185,37 @@ public class TServerUtils {
     options.stopTimeoutVal(5);
 
     // Create our own very special thread pool.
-    final ThreadPoolExecutor pool = new SimpleThreadPool(numThreads, "ClientPool");
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
+    }
+
+    return new ServerAddress(new CustomNonBlockingServer(options), address);
+  }
+
+  /**
+   * Creates a {@link SimpleThreadPool} which uses {@link SimpleTimer} to inspect the core pool size and number of active threads of the
+   * {@link ThreadPoolExecutor} and increase or decrease the core pool size based on activity (excessive or lack thereof).
+   *
+   * @param serverName
+   *          A name to describe the thrift server this executor will service
+   * @param executorThreads
+   *          The maximum number of threads for the executor
+   * @param simpleTimerThreads
+   *          The numbers of threads used to get the {@link SimpleTimer} instance
+   * @param timeBetweenThreadChecks
+   *          The amount of time, in millis, between attempts to resize the executor thread pool
+   * @return A {@link ThreadPoolExecutor} which will resize itself automatically
+   */
+  public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName, final int executorThreads, int simpleTimerThreads,
+      long timeBetweenThreadChecks) {
+    final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
     // periodically adjust the number of threads we need by checking how busy our threads are
-    SimpleTimer.getInstance(numSTThreads).schedule(new Runnable() {
+    SimpleTimer.getInstance(simpleTimerThreads).schedule(new Runnable() {
       @Override
       public void run() {
         // there is a minor race condition between sampling the current state of the thread pool and adjusting it
@@ -199,7 +227,7 @@ public class TServerUtils {
           pool.setCorePoolSize(larger);
         } else {
           if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
-            int smaller = Math.max(numThreads, pool.getCorePoolSize() - 1);
+            int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
             if (smaller != pool.getCorePoolSize()) {
               log.info("Decreasing server thread pool size on {} to {}", serverName, smaller);
               pool.setCorePoolSize(smaller);
@@ -208,15 +236,7 @@ public class TServerUtils {
         }
       }
     }, timeBetweenThreadChecks, timeBetweenThreadChecks);
-
-    options.executorService(pool);
-    options.processorFactory(new TProcessorFactory(processor));
-
-    if (address.getPort() == 0) {
-      address = HostAndPort.fromParts(address.getHostText(), transport.getPort());
-    }
-
-    return new ServerAddress(new CustomNonBlockingServer(options), address);
+    return pool;
   }
 
   /**
@@ -230,10 +250,12 @@ public class TServerUtils {
    *          Maximum size of a Thrift message allowed
    * @return A configured TThreadPoolServer and its bound address information
    */
-  public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize) throws TTransportException {
+  public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor, long maxMessageSize, String serverName, int numThreads,
+      int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
 
     TServerSocket transport = new TServerSocket(address.getPort());
-    TThreadPoolServer server = createThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize));
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+    TThreadPoolServer server = createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(maxMessageSize), pool);
 
     if (address.getPort() == 0) {
       address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
@@ -244,20 +266,22 @@ public class TServerUtils {
   }
 
   /**
-   * Create a TThreadPoolServer with the given transport and processo with the default transport factory.r
+   * Create a {@link TThreadPoolServer} with the provided transport, processor and transport factory.
    *
    * @param transport
-   *          TServerTransport for the server
+   *          Server transport
    * @param processor
-   *          TProcessor for the server
-   * @return A configured TThreadPoolServer
+   *          Processor implementation
+   * @param transportFactory
+   *          Transport factory
+   * @return A configured {@link TThreadPoolServer}
    */
-  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor) {
-    return createThreadPoolServer(transport, processor, ThriftUtil.transportFactory());
+  public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+    return createTThreadPoolServer(transport, processor, transportFactory, null);
   }
 
   /**
-   * Create a TServer with the provided server transport, processor and transport factory.
+   * Create a {@link TThreadPoolServer} with the provided server transport, processor and transport factory.
    *
    * @param transport
    *          TServerTransport for the server
@@ -266,11 +290,15 @@ public class TServerUtils {
    * @param transportFactory
    *          TTransportFactory for the server
    */
-  public static TThreadPoolServer createThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory) {
+  public static TThreadPoolServer createTThreadPoolServer(TServerTransport transport, TProcessor processor, TTransportFactory transportFactory,
+      ExecutorService service) {
     TThreadPoolServer.Args options = new TThreadPoolServer.Args(transport);
     options.protocolFactory(ThriftUtil.protocolFactory());
     options.transportFactory(transportFactory);
     options.processorFactory(new ClientInfoProcessorFactory(clientAddress, processor));
+    if (null != service) {
+      options.executorService(service);
+    }
     return new TThreadPoolServer(options);
   }
 
@@ -331,7 +359,8 @@ public class TServerUtils {
    *          SSL parameters
    * @return A ServerAddress with the bound-socket information and the Thrift server
    */
-  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams)
+  public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SslConnectionParams sslParams,
+      String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks)
       throws TTransportException {
     TServerSocket transport;
     try {
@@ -339,15 +368,18 @@ public class TServerUtils {
     } catch (UnknownHostException e) {
       throw new TTransportException(e);
     }
+
     if (address.getPort() == 0) {
       address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
     }
-    return new ServerAddress(createThreadPoolServer(transport, processor), address);
+
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSimpleTimerThreads, timeBetweenThreadChecks);
+
+    return new ServerAddress(createTThreadPoolServer(transport, processor, ThriftUtil.transportFactory(), pool), address);
   }
 
   public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor, long socketTimeout, SaslConnectionParams params,
-      final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
-      throws TTransportException {
+      final String serverName, String threadName, final int numThreads, final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException {
     // We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the TThreadPoolServer does,
     // but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it open()'s which will fail
     // when the server does an accept() to (presumably) wake up the eventing system.
@@ -377,9 +409,6 @@ public class TServerUtils {
     saslTransportFactory.addServerDefinition(ThriftUtil.GSSAPI, params.getKerberosServerPrimary(), hostname, params.getSaslProperties(),
         new SaslRpcServer.SaslGssCallbackHandler());
 
-    // Updates the clientAddress threadlocal so we know who the client's address
-    final ClientInfoProcessorFactory clientInfoFactory = new ClientInfoProcessorFactory(clientAddress, processor);
-
     // Make sure the TTransportFactory is performing a UGI.doAs
     TTransportFactory ugiTransportFactory = new UGIAssumingTransportFactory(saslTransportFactory, serverUser);
 
@@ -387,8 +416,11 @@ public class TServerUtils {
       address = HostAndPort.fromParts(address.getHostText(), transport.getServerSocket().getLocalPort());
     }
 
-    return new ServerAddress(new TThreadPoolServer(new TThreadPoolServer.Args(transport).transportFactory(ugiTransportFactory)
-        .processorFactory(clientInfoFactory).protocolFactory(ThriftUtil.protocolFactory())), address);
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+
+    final TThreadPoolServer server = createTThreadPoolServer(transport, processor, ugiTransportFactory, pool);
+
+    return new ServerAddress(server, address);
   }
 
   public static ServerAddress startTServer(AccumuloConfiguration conf, HostAndPort address, ThriftServerType serverType, TProcessor processor,
@@ -420,16 +452,17 @@ public class TServerUtils {
     switch (serverType) {
       case SSL:
         log.debug("Instantiating SSL Thrift server");
-        serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams);
+        serverAddress = createSslThreadPoolServer(address, processor, serverSocketTimeout, sslParams, serverName, numThreads, numSTThreads,
+            timeBetweenThreadChecks);
         break;
       case SASL:
         log.debug("Instantiating SASL Thrift server");
         serverAddress = createSaslThreadPoolServer(address, processor, serverSocketTimeout, saslParams, serverName, threadName, numThreads, numSTThreads,
-            timeBetweenThreadChecks, maxMessageSize);
+            timeBetweenThreadChecks);
         break;
       case THREADPOOL:
         log.debug("Instantiating unsecure TThreadPool Thrift server");
-        serverAddress = createBlockingServer(address, processor, maxMessageSize);
+        serverAddress = createBlockingServer(address, processor, maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
         break;
       case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the default
       default: