You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2016/12/06 10:12:20 UTC
hbase git commit: HBASE-17255 Backport HBASE-17181 to branch-1.2
Repository: hbase
Updated Branches:
refs/heads/branch-1.2 c9f388bec -> 9a5d1b689
HBASE-17255 Backport HBASE-17181 to branch-1.2
Signed-off-by: zhangduo <zh...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/9a5d1b68
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/9a5d1b68
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/9a5d1b68
Branch: refs/heads/branch-1.2
Commit: 9a5d1b689f3080e8303dd44b349fc0cdb286daea
Parents: c9f388b
Author: eyjian <ey...@live.com>
Authored: Mon Dec 5 15:20:20 2016 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Dec 6 18:11:02 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/thrift2/ThriftServer.java | 41 ++++++++++++++++++++
1 file changed, 41 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/9a5d1b68/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
index 429475e..c3aa6c3 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftServer.java
@@ -75,6 +75,7 @@ import org.apache.thrift.server.THsHaServer;
import org.apache.thrift.server.TNonblockingServer;
import org.apache.thrift.server.TServer;
import org.apache.thrift.server.TThreadPoolServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TNonblockingServerSocket;
import org.apache.thrift.transport.TNonblockingServerTransport;
@@ -141,6 +142,7 @@ public class ThriftServer {
options.addOption("f", "framed", false, "Use framed transport");
options.addOption("c", "compact", false, "Use the compact protocol");
options.addOption("w", "workers", true, "How many worker threads to use.");
+ options.addOption("s", "selectors", true, "How many selector threads to use.");
options.addOption("h", "help", false, "Print help information");
options.addOption(null, "infoport", true, "Port for web UI");
options.addOption("t", READ_TIMEOUT_OPTION, true,
@@ -151,6 +153,7 @@ public class ThriftServer {
servers.addOption(
new Option("nonblocking", false, "Use the TNonblockingServer. This implies the framed transport."));
servers.addOption(new Option("hsha", false, "Use the THsHaServer. This implies the framed transport."));
+ servers.addOption(new Option("selector", false, "Use the TThreadedSelectorServer. This implies the framed transport."));
servers.addOption(new Option("threadpool", false, "Use the TThreadPoolServer. This is the default."));
options.addOptionGroup(servers);
return options;
@@ -270,6 +273,30 @@ public class ThriftServer {
return new THsHaServer(serverArgs);
}
+ private static TServer getTThreadedSelectorServer(TProtocolFactory protocolFactory,
+ TProcessor processor, TTransportFactory transportFactory,
+ int workerThreads, int selectorThreads,
+ InetSocketAddress inetSocketAddress, ThriftMetrics metrics)
+ throws TTransportException {
+ TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+ log.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+ TThreadedSelectorServer.Args serverArgs = new TThreadedSelectorServer.Args(serverTransport);
+ if (workerThreads > 0) {
+ serverArgs.workerThreads(workerThreads);
+ }
+ if (selectorThreads > 0) {
+ serverArgs.selectorThreads(selectorThreads);
+ }
+
+ ExecutorService executorService = createExecutor(
+ workerThreads, metrics);
+ serverArgs.executorService(executorService);
+ serverArgs.processor(processor);
+ serverArgs.transportFactory(transportFactory);
+ serverArgs.protocolFactory(protocolFactory);
+ return new TThreadedSelectorServer(serverArgs);
+ }
+
private static ExecutorService createExecutor(
int workerThreads, ThriftMetrics metrics) {
CallQueue callQueue = new CallQueue(
@@ -336,6 +363,7 @@ public class ThriftServer {
Configuration conf = HBaseConfiguration.create();
CommandLine cmd = parseArguments(conf, options, args);
int workerThreads = 0;
+ int selectorThreads = 0;
/**
* This is to please both bin/hbase and bin/hbase-daemon. hbase-daemon provides "start" and "stop" arguments hbase
@@ -417,6 +445,7 @@ public class ThriftServer {
boolean nonblocking = cmd.hasOption("nonblocking");
boolean hsha = cmd.hasOption("hsha");
+ boolean selector = cmd.hasOption("selector");
ThriftMetrics metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.TWO);
@@ -425,6 +454,8 @@ public class ThriftServer {
implType = "nonblocking";
} else if (hsha) {
implType = "hsha";
+ } else if (selector) {
+ implType = "selector";
}
conf.set("hbase.regionserver.thrift.server.type", implType);
@@ -468,6 +499,9 @@ public class ThriftServer {
if (cmd.hasOption("w")) {
workerThreads = Integer.parseInt(cmd.getOptionValue("w"));
}
+ if (cmd.hasOption("s")) {
+ selectorThreads = Integer.parseInt(cmd.getOptionValue("s"));
+ }
// check for user-defined info server port setting, if so override the conf
try {
@@ -504,6 +538,13 @@ public class ThriftServer {
workerThreads,
inetSocketAddress,
metrics);
+ } else if (selector) {
+ server = getTThreadedSelectorServer(protocolFactory,
+ processor,
+ transportFactory,
+ workerThreads, selectorThreads,
+ inetSocketAddress,
+ metrics);
} else {
server = getTThreadPoolServer(protocolFactory,
processor,