You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2019/01/02 08:14:05 UTC
[3/4] hbase git commit: HBASE-21652 Refactor ThriftServer making
thrift2 server inherited from thrift1 server
http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
index fc00327..6d11ac6 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
@@ -18,16 +18,132 @@
package org.apache.hadoop.hbase.thrift;
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.http.HttpServerUtil;
import org.apache.hadoop.hbase.http.InfoServer;
-import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityUtil;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.Strings;
import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServlet;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
import org.apache.yetus.audience.InterfaceAudience;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
@@ -40,29 +156,36 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
* independent process.
*/
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class ThriftServer {
+public class ThriftServer extends Configured implements Tool {
private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
- private static final String MIN_WORKERS_OPTION = "minWorkers";
- private static final String MAX_WORKERS_OPTION = "workers";
- private static final String MAX_QUEUE_SIZE_OPTION = "queue";
- private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
- static final String BIND_OPTION = "bind";
- static final String COMPACT_OPTION = "compact";
- static final String FRAMED_OPTION = "framed";
- static final String PORT_OPTION = "port";
- static final String INFOPORT_OPTION = "infoport";
- private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
- private static final int DEFAULT_LISTEN_PORT = 9090;
- private Configuration conf;
- ThriftServerRunner serverRunner;
+ protected Configuration conf;
+
+ protected InfoServer infoServer;
+
+ protected TProcessor processor;
+
+ protected ThriftMetrics metrics;
+ protected HBaseServiceHandler hbaseServiceHandler;
+ protected UserGroupInformation serviceUGI;
+ protected boolean httpEnabled;
+
+ protected SaslUtil.QualityOfProtection qop;
+ protected String host;
+ protected int listenPort;
+
+
+ protected boolean securityEnabled;
+ protected boolean doAsEnabled;
+
+ protected JvmPauseMonitor pauseMonitor;
- private InfoServer infoServer;
+ protected volatile TServer tserver;
+ protected volatile Server httpServer;
- private static final String READ_TIMEOUT_OPTION = "readTimeout";
//
// Main program and support routines
@@ -72,7 +195,89 @@ public class ThriftServer {
this.conf = HBaseConfiguration.create(conf);
}
- private static void printUsageAndExit(Options options, int exitCode)
+ protected void setupParamters() throws IOException {
+ // login the server principal (if using secure Hadoop)
+ UserProvider userProvider = UserProvider.instantiate(conf);
+ securityEnabled = userProvider.isHadoopSecurityEnabled()
+ && userProvider.isHBaseSecurityEnabled();
+ if (securityEnabled) {
+ host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+ conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
+ conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
+ userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
+ }
+ this.serviceUGI = userProvider.getCurrent().getUGI();
+
+ this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+ this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
+ this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
+ this.hbaseServiceHandler = createHandler(conf, userProvider);
+ this.hbaseServiceHandler.initMetrics(metrics);
+ this.processor = createProcessor();
+
+ httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
+ doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
+ if (doAsEnabled && !httpEnabled) {
+ LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
+ }
+
+ String strQop = conf.get(THRIFT_QOP_KEY);
+ if (strQop != null) {
+ this.qop = SaslUtil.getQop(strQop);
+ }
+ if (qop != null) {
+ if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
+ qop != SaslUtil.QualityOfProtection.INTEGRITY &&
+ qop != SaslUtil.QualityOfProtection.PRIVACY) {
+ throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
+ THRIFT_QOP_KEY,
+ SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
+ SaslUtil.QualityOfProtection.INTEGRITY.name(),
+ SaslUtil.QualityOfProtection.PRIVACY.name()));
+ }
+ checkHttpSecurity(qop, conf);
+ if (!securityEnabled) {
+ throw new IOException("Thrift server must run in secure mode to support authentication");
+ }
+ }
+ registerFilters(conf);
+ pauseMonitor.start();
+ }
+
+ protected void startInfoServer() throws IOException {
+ // Put up info server.
+ int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
+
+ if (port >= 0) {
+ conf.setLong("startcode", System.currentTimeMillis());
+ String a = conf
+ .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
+ infoServer = new InfoServer("thrift", a, port, false, conf);
+ infoServer.setAttribute("hbase.conf", conf);
+ infoServer.start();
+ }
+ }
+
+ protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
+ if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
+ conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
+ !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+ throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
+ THRIFT_SSL_ENABLED_KEY + " is false");
+ }
+ }
+
+ protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
+ throws IOException {
+ return new ThriftHBaseServiceHandler(conf, userProvider);
+ }
+
+ protected TProcessor createProcessor() {
+ return new Hbase.Processor<>(
+ HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
+ }
+
+ protected void printUsageAndExit(Options options, int exitCode)
throws ExitCodeException {
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("Thrift", null, options,
@@ -85,32 +290,333 @@ public class ThriftServer {
}
/**
- * Start up or shuts down the Thrift server, depending on the arguments.
- * @param args the arguments to pass in when starting the Thrift server
+ * Setup a HTTP Server using Jetty to serve calls from THttpClient
+ *
+ * @throws IOException IOException
*/
- void doMain(final String[] args) throws Exception {
- processOptions(args);
- serverRunner = new ThriftServerRunner(conf);
+ protected void setupHTTPServer() throws IOException {
+ TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+ TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
+ conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
+
+ // Set the default max thread number to 100 to limit
+ // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
+ // Jetty set the default max thread number to 250, if we don't set it.
+ //
+ // Our default min thread number 2 is the same as that used by Jetty.
+ int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
+ conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
+ HTTP_MIN_THREADS_KEY_DEFAULT));
+ int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
+ conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ HTTP_MAX_THREADS_KEY_DEFAULT));
+ QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
+ threadPool.setMinThreads(minThreads);
+ httpServer = new Server(threadPool);
+
+ // Context handler
+ ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
+ ServletContextHandler.SESSIONS);
+ ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
+ HttpServerUtil.constrainHttpMethods(ctxHandler,
+ conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
+ THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
+
+ // set up Jetty and run the embedded server
+ HttpConfiguration httpConfig = new HttpConfiguration();
+ httpConfig.setSecureScheme("https");
+ httpConfig.setSecurePort(listenPort);
+ httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+ httpConfig.setSendServerVersion(false);
+ httpConfig.setSendDateHeader(false);
+
+ ServerConnector serverConnector;
+ if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+ HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
+ httpsConfig.addCustomizer(new SecureRequestCustomizer());
+
+ SslContextFactory sslCtxFactory = new SslContextFactory();
+ String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
+ String password = HBaseConfiguration.getPassword(conf,
+ THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
+ String keyPassword = HBaseConfiguration.getPassword(conf,
+ THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
+ sslCtxFactory.setKeyStorePath(keystore);
+ sslCtxFactory.setKeyStorePassword(password);
+ sslCtxFactory.setKeyManagerPassword(keyPassword);
+
+ String[] excludeCiphers = conf.getStrings(
+ THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (excludeCiphers.length != 0) {
+ sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
+ }
+ String[] includeCiphers = conf.getStrings(
+ THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (includeCiphers.length != 0) {
+ sslCtxFactory.setIncludeCipherSuites(includeCiphers);
+ }
- // Put up info server.
- int port = conf.getInt("hbase.thrift.info.port", 9095);
+ // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
+ String[] excludeProtocols = conf.getStrings(
+ THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
+ if (excludeProtocols.length != 0) {
+ sslCtxFactory.setExcludeProtocols(excludeProtocols);
+ }
+ String[] includeProtocols = conf.getStrings(
+ THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+ if (includeProtocols.length != 0) {
+ sslCtxFactory.setIncludeProtocols(includeProtocols);
+ }
- if (port >= 0) {
- conf.setLong("startcode", System.currentTimeMillis());
- String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
- infoServer = new InfoServer("thrift", a, port, false, conf);
- infoServer.setAttribute("hbase.conf", conf);
- infoServer.start();
+ serverConnector = new ServerConnector(httpServer,
+ new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
+ new HttpConnectionFactory(httpsConfig));
+ } else {
+ serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
}
+ serverConnector.setPort(listenPort);
+ serverConnector.setHost(getBindAddress(conf).getHostAddress());
+ httpServer.addConnector(serverConnector);
+ httpServer.setStopAtShutdown(true);
- serverRunner.run();
+ if (doAsEnabled) {
+ ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+ }
+ LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
}
/**
- * Parse the command line options to set parameters the conf.
+ * Setting up the thrift TServer
*/
- private void processOptions(final String[] args) throws Exception {
- Options options = new Options();
+ protected void setupServer() throws Exception {
+ // Construct correct ProtocolFactory
+ TProtocolFactory protocolFactory = getProtocolFactory();
+
+ ImplType implType = ImplType.getServerImpl(conf);
+ TProcessor processorToUse = processor;
+
+ // Construct correct TransportFactory
+ TTransportFactory transportFactory;
+ if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
+ if (qop != null) {
+ throw new RuntimeException("Thrift server authentication"
+ + " doesn't work with framed transport yet");
+ }
+ transportFactory = new TFramedTransport.Factory(
+ conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
+ LOG.debug("Using framed transport");
+ } else if (qop == null) {
+ transportFactory = new TTransportFactory();
+ } else {
+ // Extract the name from the principal
+ String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
+ if (thriftKerberosPrincipal == null) {
+ throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
+ }
+ String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
+ Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
+ TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+ saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
+ new SaslRpcServer.SaslGssCallbackHandler() {
+ @Override
+ public void handle(Callback[] callbacks)
+ throws UnsupportedCallbackException {
+ AuthorizeCallback ac = null;
+ for (Callback callback : callbacks) {
+ if (callback instanceof AuthorizeCallback) {
+ ac = (AuthorizeCallback) callback;
+ } else {
+ throw new UnsupportedCallbackException(callback,
+ "Unrecognized SASL GSSAPI Callback");
+ }
+ }
+ if (ac != null) {
+ String authid = ac.getAuthenticationID();
+ String authzid = ac.getAuthorizationID();
+ if (!authid.equals(authzid)) {
+ ac.setAuthorized(false);
+ } else {
+ ac.setAuthorized(true);
+ String userName = SecurityUtil.getUserFromPrincipal(authzid);
+ LOG.info("Effective user: {}", userName);
+ ac.setAuthorizedID(userName);
+ }
+ }
+ }
+ });
+ transportFactory = saslFactory;
+
+ // Create a processor wrapper, to get the caller
+ processorToUse = (inProt, outProt) -> {
+ TSaslServerTransport saslServerTransport =
+ (TSaslServerTransport)inProt.getTransport();
+ SaslServer saslServer = saslServerTransport.getSaslServer();
+ String principal = saslServer.getAuthorizationID();
+ hbaseServiceHandler.setEffectiveUser(principal);
+ return processor.process(inProt, outProt);
+ };
+ }
+
+ if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+ LOG.error("Server types {} don't support IP address binding at the moment. See " +
+ "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
+ Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
+ throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
+ }
+
+ InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
+ if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+ implType == ImplType.THREADED_SELECTOR) {
+ TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+ if (implType == ImplType.NONBLOCKING) {
+ tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
+ transportFactory, inetSocketAddress);
+ } else if (implType == ImplType.HS_HA) {
+ tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
+ inetSocketAddress);
+ } else { // THREADED_SELECTOR
+ tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
+ transportFactory, inetSocketAddress);
+ }
+ LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
+ Integer.toString(listenPort));
+ } else if (implType == ImplType.THREAD_POOL) {
+ this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
+ inetSocketAddress);
+ } else {
+ throw new AssertionError("Unsupported Thrift server implementation: " +
+ implType.simpleClassName());
+ }
+
+ // A sanity check that we instantiated the right type of server.
+ if (tserver.getClass() != implType.serverClass) {
+ throw new AssertionError("Expected to create Thrift server class " +
+ implType.serverClass.getName() + " but got " +
+ tserver.getClass().getName());
+ }
+ }
+
+ private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
+ TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
+ serverArgs.processor(processor);
+ serverArgs.transportFactory(transportFactory);
+ serverArgs.protocolFactory(protocolFactory);
+ return new TNonblockingServer(serverArgs);
+ }
+
+ private TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
+ THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+ int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+ TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+ CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+ int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ serverArgs.getMaxWorkerThreads());
+ ExecutorService executorService = createExecutor(
+ callQueue, workerThread, workerThread);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
+ return new THsHaServer(serverArgs);
+ }
+
+ private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
+ TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+ InetSocketAddress inetSocketAddress) {
+ LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+ TThreadedSelectorServer.Args serverArgs =
+ new HThreadedSelectorServerArgs(serverTransport, conf);
+ int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+ TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+ CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+ int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+ serverArgs.getWorkerThreads());
+ int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
+ serverArgs.selectorThreads(selectorThreads);
+ ExecutorService executorService = createExecutor(
+ callQueue, workerThreads, workerThreads);
+ serverArgs.executorService(executorService).processor(processor)
+ .transportFactory(transportFactory).protocolFactory(protocolFactory);
+ return new TThreadedSelectorServer(serverArgs);
+ }
+
+ private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
+ TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
+ LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
+ // Thrift's implementation uses '0' as a placeholder for 'use the default.'
+ int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
+ int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
+ THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
+ TServerTransport serverTransport = new TServerSocket(
+ new TServerSocket.ServerSocketTransportArgs().
+ bindAddr(inetSocketAddress).backlog(backlog).
+ clientTimeout(readTimeout));
+
+ TBoundedThreadPoolServer.Args serverArgs =
+ new TBoundedThreadPoolServer.Args(serverTransport, conf);
+ serverArgs.processor(processor).transportFactory(transportFactory)
+ .protocolFactory(protocolFactory);
+ return new TBoundedThreadPoolServer(serverArgs, metrics);
+ }
+
+ private TProtocolFactory getProtocolFactory() {
+ TProtocolFactory protocolFactory;
+
+ if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
+ LOG.debug("Using compact protocol");
+ protocolFactory = new TCompactProtocol.Factory();
+ } else {
+ LOG.debug("Using binary protocol");
+ protocolFactory = new TBinaryProtocol.Factory();
+ }
+
+ return protocolFactory;
+ }
+
+ ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
+ int minWorkers, int maxWorkers) {
+ ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+ tfb.setDaemon(true);
+ tfb.setNameFormat("thrift-worker-%d");
+ ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
+ Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
+ threadPool.allowCoreThreadTimeOut(true);
+ return threadPool;
+ }
+
+ private InetAddress getBindAddress(Configuration conf)
+ throws UnknownHostException {
+ String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+ return InetAddress.getByName(bindAddressStr);
+ }
+
+
+ public static void registerFilters(Configuration conf) {
+ String[] filters = conf.getStrings(THRIFT_FILTERS);
+ Splitter splitter = Splitter.on(':');
+ if(filters != null) {
+ for(String filterClass: filters) {
+ List<String> filterPart = splitter.splitToList(filterClass);
+ if(filterPart.size() != 2) {
+ LOG.warn("Invalid filter specification " + filterClass + " - skipping");
+ } else {
+ ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
+ }
+ }
+ }
+ }
+
+ /**
+ * Add options to command lines
+ * @param options options
+ */
+ protected void addOptions(Options options) {
options.addOption("b", BIND_OPTION, true, "Address to bind " +
"the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
@@ -118,62 +624,56 @@ public class ThriftServer {
options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
options.addOption("h", "help", false, "Print help information");
+ options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
options.addOption("m", MIN_WORKERS_OPTION, true,
"The minimum number of worker threads for " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("w", MAX_WORKERS_OPTION, true,
"The maximum number of worker threads for " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
"The maximum number of queued requests in " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
"The amount of time in secods to keep a thread alive when idle in " +
- ImplType.THREAD_POOL.simpleClassName());
+ ImplType.THREAD_POOL.simpleClassName());
options.addOption("t", READ_TIMEOUT_OPTION, true,
"Amount of time in milliseconds before a server thread will timeout " +
- "waiting for client to send data on a connected socket. Currently, " +
- "only applies to TBoundedThreadPoolServer");
+ "waiting for client to send data on a connected socket. Currently, " +
+ "only applies to TBoundedThreadPoolServer");
options.addOptionGroup(ImplType.createOptionGroup());
+ }
- CommandLineParser parser = new DefaultParser();
- CommandLine cmd = parser.parse(options, args);
-
- if (cmd.hasOption("help")) {
- printUsageAndExit(options, 1);
- }
-
+ protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
// Get port to bind to
try {
if (cmd.hasOption(PORT_OPTION)) {
int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
- conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
+ conf.setInt(PORT_CONF_KEY, listenPort);
}
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the port option", e);
printUsageAndExit(options, -1);
}
-
// check for user-defined info server port setting, if so override the conf
try {
if (cmd.hasOption(INFOPORT_OPTION)) {
String val = cmd.getOptionValue(INFOPORT_OPTION);
- conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
+ conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
LOG.debug("Web UI port set to " + val);
}
} catch (NumberFormatException e) {
LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
- " option", e);
+ " option", e);
printUsageAndExit(options, -1);
}
-
// Make optional changes to the configuration based on command-line options
optionToConf(cmd, MIN_WORKERS_OPTION,
conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@@ -183,23 +683,42 @@ public class ThriftServer {
conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
- optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
- ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+ optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+ optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
// Set general thrift server options
boolean compact = cmd.hasOption(COMPACT_OPTION) ||
- conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
- conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact);
+ conf.getBoolean(COMPACT_CONF_KEY, false);
+ conf.setBoolean(COMPACT_CONF_KEY, compact);
boolean framed = cmd.hasOption(FRAMED_OPTION) ||
- conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
- conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
- if (cmd.hasOption(BIND_OPTION)) {
- conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
- }
+ conf.getBoolean(FRAMED_CONF_KEY, false);
+ conf.setBoolean(FRAMED_CONF_KEY, framed);
+
+ optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
+
ImplType.setServerImpl(cmd, conf);
}
+ /**
+ * Parse the command line options to set parameters the conf.
+ */
+ private void processOptions(final String[] args) throws Exception {
+ if (args == null || args.length == 0) {
+ return;
+ }
+ Options options = new Options();
+ addOptions(options);
+
+ CommandLineParser parser = new DefaultParser();
+ CommandLine cmd = parser.parse(options, args);
+
+ if (cmd.hasOption("help")) {
+ printUsageAndExit(options, 1);
+ }
+ parseCommandLine(cmd, options);
+ }
+
public void stop() {
if (this.infoServer != null) {
LOG.info("Stopping infoServer");
@@ -209,10 +728,25 @@ public class ThriftServer {
LOG.error("Failed to stop infoServer", ex);
}
}
- serverRunner.shutdown();
+ if (pauseMonitor != null) {
+ pauseMonitor.stop();
+ }
+ if (tserver != null) {
+ tserver.stop();
+ tserver = null;
+ }
+ if (httpServer != null) {
+ try {
+ httpServer.stop();
+ httpServer = null;
+ } catch (Exception e) {
+ LOG.error("Problem encountered in shutting down HTTP server", e);
+ }
+ httpServer = null;
+ }
}
- private static void optionToConf(CommandLine cmd, String option,
+ protected static void optionToConf(CommandLine cmd, String option,
Configuration conf, String destConfKey) {
if (cmd.hasOption(option)) {
String value = cmd.getOptionValue(option);
@@ -221,16 +755,38 @@ public class ThriftServer {
}
}
+ /**
+ * Run without any command line arguments
+ * @return exit code
+ * @throws Exception exception
+ */
+ public int run() throws Exception {
+ return run(null);
+ }
+
+ @Override
+ public int run(String[] strings) throws Exception {
+ processOptions(strings);
+ setupParamters();
+ startInfoServer();
+ if (httpEnabled) {
+ setupHTTPServer();
+ httpServer.start();
+ httpServer.join();
+ } else {
+ setupServer();
+ tserver.serve();
+ }
+ return 0;
+ }
+
public static void main(String [] args) throws Exception {
LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
VersionInfo.logVersion();
- int exitCode = 0;
- try {
- new ThriftServer(HBaseConfiguration.create()).doMain(args);
- } catch (ExitCodeException ex) {
- exitCode = ex.getExitCode();
- }
+ final Configuration conf = HBaseConfiguration.create();
+ // for now, only time we return is on an argument error.
+ final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
- System.exit(exitCode);
+ System.exit(status);
}
}