You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/11/17 14:23:58 UTC
hbase git commit: HBASE-18967 Backport HBASE-17181 to branch-1.3 (Let
HBase thrift2 support TThreadedSelectorServer)
Repository: hbase
Updated Branches:
refs/heads/branch-1.3 b84e26973 -> 746447cf6
HBASE-18967 Backport HBASE-17181 to branch-1.3 (Let HBase thrift2 support TThreadedSelectorServer)
(cherry picked from commit 00b302435536974bc56e72deb406002c3b13669e)
Signed-off-by: Chia-Ping Tsai <ch...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/746447cf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/746447cf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/746447cf
Branch: refs/heads/branch-1.3
Commit: 746447cf6487da3f0c9ec72b3824e9c99e88430e
Parents: b84e269
Author: Peter Somogyi <ps...@cloudera.com>
Authored: Tue Oct 10 15:30:24 2017 -0700
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Nov 17 22:16:17 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/thrift2/ThriftServer.java | 57 +++++++++++++++++---
1 file changed, 51 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/746447cf/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 37bf06d..4c7ca8a 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -48,10 +48,10 @@ import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.filter.ParseFilter;
import org.apache.hadoop.hbase.http.InfoServer;
import org.apache.hadoop.hbase.security.SaslUtil;
@@ -64,8 +64,8 @@ import org.apache.hadoop.hbase.thrift.ThriftMetrics;
import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
import org.apache.hadoop.hbase.util.DNS;
import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.thrift.TException;
import org.apache.thrift.TProcessor;
@@ -77,6 +77,7 @@ import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -143,6 +144,7 @@ public class ThriftServer {
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
+ options.addOption("s", "selectors", true, "How many selector threads to use.");
options.addOption("q", "callQueueSize", true,
"Max size of request queue (unbounded by default)");
options.addOption("h", "help", false, "Print help information");
@@ -153,9 +155,14 @@ public class ThriftServer {
"only applies to TBoundedThreadPoolServer");
OptionGroup servers = new OptionGroup();
servers.addOption(
- new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
- servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
- servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
+ new Option("nonblocking", false,
+ "Use the TNonblockingServer. This implies the framed transport."));
+ servers.addOption(new Option("hsha", false,
+ "Use the THsHaServer. This implies the framed transport."));
+ servers.addOption(new Option("selector", false,
+ "Use the TThreadedSelectorServer. This implies the framed transport."));
+ servers.addOption(new Option("threadpool", false,
+ "Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers);
return options;
}
@@ -275,6 +282,29 @@ public class ThriftServer {
return new THsHaServer(serverArgs);
}
+ private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
+ TProcessor processor, TTransportFactory transportFactory,
+ int workerThreads, int selectorThreads, int maxCallQueueSize,
+ InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+ throws TTransportException {
+ TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+ log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+ TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
+ if (workerThreads > 0) {
+ serverArgs.workerThreads(workerThreads);
+ }
+ if (selectorThreads > 0) {
+ serverArgs.selectorThreads(selectorThreads);
+ }
+
+ ExecutorService executorService = createExecutor(workerThreads, maxCallQueueSize, metrics);
+ serverArgs.executorService(executorService);
+ serverArgs.processor(processor);
+ serverArgs.transportFactory(transportFactory);
+ serverArgs.protocolFactory(protocolFactory);
+ return new TThreadedSelectorServer(serverArgs);
+ }
+
private static ExecutorService createExecutor(
int workerThreads, int maxCallQueueSize, ThriftMetrics metrics) {
CallQueue callQueue;
@@ -352,6 +382,7 @@ public class ThriftServer {
Configuration conf = HBaseConfiguration.create();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
+ int selectorThreads = 0;
int maxCallQueueSize = -1; // use unbounded queue by default
/**
@@ -434,6 +465,7 @@ public class ThriftServer {
boolean nonblocking = cmd.hasOption("nonblocking");
boolean hsha = cmd.hasOption("hsha");
+ boolean selector = cmd.hasOption("selector");
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
@@ -442,6 +474,8 @@ public class ThriftServer {
implType = "nonblocking";
} else if (hsha) {
implType = "hsha";
+ } else if (selector) {
+ implType = "selector";
}
conf.set("hbase.regionserver.thrift.server.type", implType);
@@ -485,7 +519,9 @@ public class ThriftServer {
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
-
+ if (cmd.hasOption("s")) {
+ selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
+ }
if (cmd.hasOption("q")) {
maxCallQueueSize = Integer.parseInt(cmd.getOptionValue("q"));
}
@@ -526,6 +562,15 @@ public class ThriftServer {
maxCallQueueSize,
inetSocketAddress,
metrics);
+ } else if (selector) {
+ server = getTThreadedSelectorServer(protocolFactory,
+ processor,
+ transportFactory,
+ workerThreads,
+ selectorThreads,
+ maxCallQueueSize,
+ inetSocketAddress,
+ metrics);
} else {
server = getTThreadPoolServer(protocolFactory,
processor,