You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2019/04/02 21:24:31 UTC

[accumulo] branch master updated: Switch to thrift server with multiple selector threads. (#1059)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/master by this push:
     new ff0478b  Switch to thrift server with multiple selector threads. (#1059)
ff0478b is described below

commit ff0478b171b7da8d91d3342f1d972f24dd6e1a62
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Apr 2 17:24:27 2019 -0400

    Switch to thrift server with multiple selector threads. (#1059)
    
    Before this change Accumulo's used the THsHaServer thrift server. This
    server has a single select threads that read and writes frames to
    sockets.  The select thread puts the frames on a thread pool for
    processing.  On a machine with a lot of cores, this single thread can
    become a bottleneck.  This commit switches to use the thrift
    TThreadedSelectorServer server which support multiple select threads.
    
    To demonstrate the differernce this change can make, the
    RandomCachedLookupsPT performance test in accumulo-testing was run with
    and without this change.  The test was run on a machine with 16 cores.
    The test loads a table into cache and then does random lookups with
    varying numbers of threads.  It measures the average lookup time for
    each lookup. Below is a table of running this test.
    
    | Threads | Single Selector | Multi Selector | Speedup |
    |---------|-----------------|----------------|---------|
    |      1  | 0.14 ms         | 0.14 ms        | 1.00    |
    |      4  | 0.18 ms         | 0.19 ms        | 0.95    |
    |      8  | 0.24 ms         | 0.24 ms        | 1.00    |
    |     16  | 0.44 ms         | 0.44 ms        | 1.00    |
    |     32  | 0.85 ms         | 0.81 ms        | 1.05    |
    |     64  | 1.74 ms         | 1.40 ms        | 1.24    |
    |    128  | 3.51 ms         | 2.44 ms        | 1.44    |
    |    256  | 7.21 ms         | 4.52 ms        | 1.60    |
    
    This table shows that with 256 client threads doing lookups that each
    lookup took 7.21ms on average with a single selector thread.  With this
    change and four selector threads, each loopup took 4.52ms on average
    with 256 client threads.  During both test there were 256 threads to
    process client request on the tserver.  The selector threads hand frames
    off to these 256 threads.
    
    Watching top+jstack while running these test show the single selector
    thread will eventually reach 100% on a core.  With 4 selector threads
    each used 36% CPU when 256 client threads were doing lookups.
---
 .../server/rpc/CustomThreadedSelectorServer.java   | 68 ++++++++++++++++++++++
 .../apache/accumulo/server/rpc/TServerUtils.java   | 53 +++++++++++++++--
 .../accumulo/server/rpc/ThriftServerType.java      | 10 +++-
 .../accumulo/server/rpc/ThriftServerTypeTest.java  |  2 +-
 4 files changed, 124 insertions(+), 9 deletions(-)

diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java
new file mode 100644
index 0000000..c5a898d
--- /dev/null
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/CustomThreadedSelectorServer.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.accumulo.server.rpc;
+
+import java.lang.reflect.Field;
+import java.net.Socket;
+
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingSocket;
+import org.apache.thrift.transport.TNonblockingTransport;
+
+public class CustomThreadedSelectorServer extends TThreadedSelectorServer {
+
+  private final Field fbTansportField;
+
+  public CustomThreadedSelectorServer(Args args) {
+    super(args);
+
+    try {
+      fbTansportField = FrameBuffer.class.getDeclaredField("trans_");
+      fbTansportField.setAccessible(true);
+    } catch (SecurityException | NoSuchFieldException e) {
+      throw new RuntimeException("Failed to access required field in Thrift code.", e);
+    }
+  }
+
+  private TNonblockingTransport getTransport(FrameBuffer frameBuffer) {
+    try {
+      return (TNonblockingTransport) fbTansportField.get(frameBuffer);
+    } catch (IllegalAccessException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  protected Runnable getRunnable(FrameBuffer frameBuffer) {
+    return () -> {
+
+      TNonblockingTransport transport = getTransport(frameBuffer);
+
+      if (transport instanceof TNonblockingSocket) {
+        // This block of code makes the client address available to the server side code that
+        // executes a RPC. It is made available for informational purposes.
+        TNonblockingSocket tsock = (TNonblockingSocket) transport;
+        Socket sock = tsock.getSocketChannel().socket();
+        TServerUtils.clientAddress
+            .set(sock.getInetAddress().getHostAddress() + ":" + sock.getPort());
+      }
+
+      frameBuffer.invoke();
+    };
+  }
+
+}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index 6d75dfe..d112bb5 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -51,6 +51,7 @@ import org.apache.thrift.TProcessorFactory;
 import org.apache.thrift.protocol.TProtocolFactory;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TSSLTransportFactory;
 import org.apache.thrift.transport.TSaslServerTransport;
@@ -186,7 +187,43 @@ public class TServerUtils {
   }
 
   /**
-   * Create a NonBlockingServer with a custom thread pool that can dynamically resize itself.
+   * Create a non blocking server with multiple select threads and a custom thread pool that can
+   * dynamically resize itself.
+   */
+  public static ServerAddress createThreadedSelectorServer(HostAndPort address,
+      TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
+      final int numThreads, final int numSTThreads, long timeBetweenThreadChecks,
+      long maxMessageSize) throws TTransportException {
+
+    final TNonblockingServerSocket transport = new TNonblockingServerSocket(
+        new InetSocketAddress(address.getHost(), address.getPort()));
+
+    TThreadedSelectorServer.Args options = new TThreadedSelectorServer.Args(transport);
+
+    options.selectorThreads = Math.max(2, Runtime.getRuntime().availableProcessors() / 4);
+    log.info("selectorThreads : " + options.selectorThreads);
+    options.protocolFactory(protocolFactory);
+    options.transportFactory(ThriftUtil.transportFactory(maxMessageSize));
+    options.maxReadBufferBytes = maxMessageSize;
+    options.stopTimeoutVal(5);
+
+    // Create our own very special thread pool.
+    ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads, numSTThreads,
+        timeBetweenThreadChecks);
+
+    options.executorService(pool);
+    options.processorFactory(new TProcessorFactory(processor));
+
+    if (address.getPort() == 0) {
+      address = HostAndPort.fromParts(address.getHost(), transport.getPort());
+    }
+
+    return new ServerAddress(new CustomThreadedSelectorServer(options), address);
+  }
+
+  /**
+   * Create a NonBlockingServer with a single select threads and a custom thread pool that can
+   * dynamically resize itself.
    */
   public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
       TProtocolFactory protocolFactory, final String serverName, final int numThreads,
@@ -570,12 +607,18 @@ public class TServerUtils {
             serverAddress = createBlockingServer(address, processor, protocolFactory,
                 maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
             break;
-          case CUSTOM_HS_HA: // Intentional passthrough -- Our custom wrapper around HsHa is the
-                             // default
-          default:
-            log.debug("Instantiating default, unsecure custom half-async Thrift server");
+          case CUSTOM_HS_HA:
+            log.debug("Instantiating unsecure custom half-async Thrift server");
             serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
                 numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+            break;
+          case THREADED_SELECTOR: // Intentional passthrough -- Our custom wrapper around threaded
+                                  // selector is the default
+          default:
+            log.debug("Instantiating default, unsecure Threaded selector Thrift server");
+            serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
+                serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+            break;
         }
         break;
       } catch (TTransportException e) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
index 64c944e..04f64e9 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/ThriftServerType.java
@@ -28,7 +28,11 @@ import org.apache.commons.lang.StringUtils;
  * benchmarks against "unsecure" Accumulo use the same type of Thrift server.
  */
 public enum ThriftServerType {
-  CUSTOM_HS_HA("custom_hs_ha"), THREADPOOL("threadpool"), SSL("ssl"), SASL("sasl");
+  CUSTOM_HS_HA("custom_hs_ha"),
+  THREADPOOL("threadpool"),
+  SSL("ssl"),
+  SASL("sasl"),
+  THREADED_SELECTOR("threaded_selector");
 
   private final String name;
 
@@ -39,7 +43,7 @@ public enum ThriftServerType {
   public static ThriftServerType get(String name) {
     // Our custom HsHa server is the default (if none is provided)
     if (StringUtils.isBlank(name)) {
-      return CUSTOM_HS_HA;
+      return THREADED_SELECTOR;
     }
     return ThriftServerType.valueOf(name.trim().toUpperCase());
   }
@@ -50,6 +54,6 @@ public enum ThriftServerType {
   }
 
   public static ThriftServerType getDefault() {
-    return CUSTOM_HS_HA;
+    return THREADED_SELECTOR;
   }
 }
diff --git a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
index 708f70e..01bc513 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/rpc/ThriftServerTypeTest.java
@@ -25,7 +25,7 @@ public class ThriftServerTypeTest {
 
   @Test
   public void testDefaultServer() {
-    assertEquals(ThriftServerType.CUSTOM_HS_HA,
+    assertEquals(ThriftServerType.THREADED_SELECTOR,
         ThriftServerType.get(Property.GENERAL_RPC_SERVER_TYPE.getDefaultValue()));
   }