You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/06 10:12:20 UTC

hbase git commit: HBASE-17255 Backport HBASE-17181 to branch-1.2

Repository: hbase
Updated Branches:
  refs/heads/branch-1.2 c9f388bec -> 9a5d1b689


HBASE-17255 Backport HBASE-17181 to branch-1.2

Signed-off-by: zhangduo <zh...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a5d1b68
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a5d1b68
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a5d1b68

Branch: refs/heads/branch-1.2
Commit: 9a5d1b689f3080e8303dd44b349fc0cdb286daea
Parents: c9f388b
Author: eyjian <ey...@live.com>
Authored: Mon Dec 5 15:20:20 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Dec 6 18:11:02 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/thrift2/ThriftServer.java      | 41 ++++++++++++++++++++
 1 file changed, 41 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/9a5d1b68/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 429475e..c3aa6c3 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -75,6 +75,7 @@ import org.apache.thrift.server.THsHaServer;
 import org.apache.thrift.server.TNonblockingServer;
 import org.apache.thrift.server.TServer;
 import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
 import org.apache.thrift.transport.TFramedTransport;
 import org.apache.thrift.transport.TNonblockingServerSocket;
 import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -141,6 +142,7 @@ public class ThriftServer {
     options.addOption("f", "framed", false, "Use framed transport");
     options.addOption("c", "compact", false, "Use the compact protocol");
     options.addOption("w", "workers", true, "How many worker threads to use.");
+    options.addOption("s", "selectors", true, "How many selector threads to use.");
     options.addOption("h", "help", false, "Print help information");
     options.addOption(null, "infoport", true, "Port for web UI");
     options.addOption("t", READ_TIMEOUT_OPTION, true,
@@ -151,6 +153,7 @@ public class ThriftServer {
     servers.addOption(
         new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
     servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
+    servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
     servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
     options.addOptionGroup(servers);
     return options;
@@ -270,6 +273,30 @@ public class ThriftServer {
     return new THsHaServer(serverArgs);
   }
 
+  private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
+          TProcessor processor, TTransportFactory transportFactory,
+          int workerThreads, int selectorThreads,
+          InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+          throws TTransportException {
+        TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+        log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+        TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
+        if (workerThreads > 0) {
+            serverArgs.workerThreads(workerThreads);
+        }
+        if (selectorThreads > 0) {
+            serverArgs.selectorThreads(selectorThreads);
+        }
+
+        ExecutorService executorService = createExecutor(
+            workerThreads, metrics);
+        serverArgs.executorService(executorService);
+        serverArgs.processor(processor);
+        serverArgs.transportFactory(transportFactory);
+        serverArgs.protocolFactory(protocolFactory);
+        return new TThreadedSelectorServer(serverArgs);
+      }
+
   private static ExecutorService createExecutor(
       int workerThreads, ThriftMetrics metrics) {
     CallQueue callQueue = new CallQueue(
@@ -336,6 +363,7 @@ public class ThriftServer {
     Configuration conf = HBaseConfiguration.create();
     CommandLine cmd = parseArguments(conf, options, args);
     int workerThreads = 0;
+    int selectorThreads = 0;
 
     /**
      * This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
@@ -417,6 +445,7 @@ public class ThriftServer {
 
     boolean nonblocking = cmd.hasOption("nonblocking");
     boolean hsha = cmd.hasOption("hsha");
+    boolean selector = cmd.hasOption("selector");
 
     ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
 
@@ -425,6 +454,8 @@ public class ThriftServer {
       implType = "nonblocking";
     } else if (hsha) {
       implType = "hsha";
+    } else if (selector) {
+      implType = "selector";
     }
 
     conf.set("hbase.regionserver.thrift.server.type", implType);
@@ -468,6 +499,9 @@ public class ThriftServer {
     if (cmd.hasOption("w")) {
       workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
     }
+    if (cmd.hasOption("s")) {
+      selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
+    }
 
     // check for user-defined info server port setting, if so override the conf
     try {
@@ -504,6 +538,13 @@ public class ThriftServer {
           workerThreads,
           inetSocketAddress,
           metrics);
+    } else if (selector) {
+          server = getTThreadedSelectorServer(protocolFactory,
+          processor,
+          transportFactory,
+          workerThreads, selectorThreads,
+          inetSocketAddress,
+          metrics);
     } else {
       server = getTThreadPoolServer(protocolFactory,
           processor,