You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/11/17 14:23:58 UTC

hbase git commit: HBASE-18967 Backport HBASE-17181 to branch-1.3 (Let HBase thrift2 support TThreadedSelectorServer)

Repository: hbase
Updated Branches:
  refs/heads/branch-1.3 b84e26973 -> 746447cf6


HBASE-18967 Backport HBASE-17181 to branch-1.3 (Let HBase thrift2 support TThreadedSelectorServer)

(cherry picked from commit 00b302435536974bc56e72deb406002c3b13669e)

Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/746447cf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/746447cf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/746447cf

Branch: refs/heads/branch-1.3
Commit: 746447cf6487da3f0c9ec72b3824e9c99e88430e
Parents: b84e269
Author: Peter Somogyi <ps...@cloudera.com>
Authored: Tue Oct 10 15:30:24 2017 -0700
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Nov 17 22:16:17 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/thrift2/ThriftServer.java      | 57 +++++++++++++++++---
 1 file changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/746447cf/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 37bf06d..4c7ca8a 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -48,10 +48,10 @@ import org.apache.commons.cli.ParseException;
 import org.apache.commons.cli.PosixParser;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.filter.ParseFilter;
 import org.apache.hadoop.hbase.http.InfoServer;
 import org.apache.hadoop.hbase.security.SaslUtil;
@@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.thrift.ThriftMetrics;
 import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.hadoop.hbase.util.DNS;
 import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.thrift.TException;
 import org.apache.thrift.TProcessor;
@@ -77,6 +77,7 @@ import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -143,6 +144,7 @@ public class ThriftServer {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("w", "workers", true, "How many worker threads to use.");
+    options.addOption("s", "selectors", true, "How many selector threads to use.");
     options.addOption("q", "callQueueSize", true,
       "Max size of request queue (unbounded by default)");
     options.addOption("h", "help", false, "Print help information");
@@ -153,9 +155,14 @@ public class ThriftServer {
       "only applies to TBoundedThreadPoolServer");
     OptionGroup servers = new OptionGroup();
     servers.addOption(
-        new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
-    servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
-    servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
+        new Option("nonblocking", false,
+            "Use the TNonblockingServer. This implies the framed transport."));
+    servers.addOption(new Option("hsha", false,
+        "Use the THsHaServer. This implies the framed transport."));
+    servers.addOption(new Option("selector", false,
+        "Use the TThreadedSelectorServer. This implies the framed transport."));
+    servers.addOption(new Option("threadpool", false,
+        "Use the TThreadPoolServer. This is the default."));
     options.addOptionGroup(servers);
     return options;
   }
@@ -275,6 +282,29 @@ public class ThriftServer {
     return new THsHaServer(serverArgs);
   }
 
+  private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
+      TProcessor processor, TTransportFactory transportFactory,
+      int workerThreads, int selectorThreads, int maxCallQueueSize,
+      InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+      throws TTransportException {
+    TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+    log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+    TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
+    if (workerThreads > 0) {
+      serverArgs.workerThreads(workerThreads);
+    }
+    if (selectorThreads > 0) {
+      serverArgs.selectorThreads(selectorThreads);
+    }
+
+    ExecutorService executorService = createExecutor(workerThreads, maxCallQueueSize, metrics);
+    serverArgs.executorService(executorService);
+    serverArgs.processor(processor);
+    serverArgs.transportFactory(transportFactory);
+    serverArgs.protocolFactory(protocolFactory);
+    return new TThreadedSelectorServer(serverArgs);
+  }
+
   private static ExecutorService createExecutor(
       int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
     CallQueue callQueue;
@@ -352,6 +382,7 @@ public class ThriftServer {
     Configuration conf = HBaseConfiguration.create();
     CommandLine cmd = parseArguments(conf, options, args);
     int workerThreads = 0;
+    int selectorThreads = 0;
     int maxCallQueueSize = -1; // use unbounded queue by default
 
     /**
@@ -434,6 +465,7 @@ public class ThriftServer {
 
     boolean nonblocking = cmd.hasOption("nonblocking");
     boolean hsha = cmd.hasOption("hsha");
+    boolean selector = cmd.hasOption("selector");
 
     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
 
@@ -442,6 +474,8 @@ public class ThriftServer {
       implType = "nonblocking";
     } else if (hsha) {
       implType = "hsha";
+    } else if (selector) {
+      implType = "selector";
     }
 
     conf.set("hbase.regionserver.thrift.server.type", implType);
@@ -485,7 +519,9 @@ public class ThriftServer {
     if (cmd.hasOption("w")) {
       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
     }
-
+    if (cmd.hasOption("s")) {
+      selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
+    }
     if (cmd.hasOption("q")) {
       maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
     }
@@ -526,6 +562,15 @@ public class ThriftServer {
           maxCallQueueSize,
           inetSocketAddress,
           metrics);
+    } else if (selector) {
+      server = getTThreadedSelectorServer(protocolFactory,
+          processor,
+          transportFactory,
+          workerThreads,
+          selectorThreads,
+          maxCallQueueSize,
+          inetSocketAddress,
+          metrics);
     } else {
       server = getTThreadPoolServer(protocolFactory,
           processor,