You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2019/01/02 08:14:05 UTC

[3/4] hbase git commit: HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
index fc00327..6d11ac6 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServer.java
@@ -18,16 +18,132 @@
 
 package org.apache.hadoop.hbase.thrift;
 
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_DEAFULT;
+import static org.apache.hadoop.hbase.thrift.Constants.BACKLOG_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.BIND_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.COMPACT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_BIND_ADDR;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_HTTP_MAX_HEADER_SIZE;
+import static org.apache.hadoop.hbase.thrift.Constants.DEFAULT_LISTEN_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.FRAMED_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MAX_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.HTTP_MIN_THREADS_KEY_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.INFOPORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.KEEP_ALIVE_SEC_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_FRAME_SIZE_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_QUEUE_SIZE_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MAX_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.MIN_WORKERS_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_CONF_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.PORT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.READ_TIMEOUT_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.SELECTOR_NUM_OPTION;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_INTERFACE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_DNS_NAMESERVER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_FILTERS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_INFO_SERVER_PORT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KERBEROS_PRINCIPAL_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_QOP_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SELECTOR_NUM;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_ENABLED_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_INCLUDE_PROTOCOLS_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_PASSWORD_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SSL_KEYSTORE_STORE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SUPPORT_PROXYUSER_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.USE_HTTP_CONF_KEY;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.sasl.AuthorizeCallback;
+import javax.security.sasl.SaslServer;
+
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.http.HttpServerUtil;
 import org.apache.hadoop.hbase.http.InfoServer;
-import org.apache.hadoop.hbase.thrift.ThriftServerRunner.ImplType;
+import org.apache.hadoop.hbase.security.SaslUtil;
+import org.apache.hadoop.hbase.security.SecurityUtil;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.util.DNS;
+import org.apache.hadoop.hbase.util.JvmPauseMonitor;
+import org.apache.hadoop.hbase.util.Strings;
 import org.apache.hadoop.hbase.util.VersionInfo;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.thrift.TProcessor;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TServlet;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TSaslServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
 import org.apache.yetus.audience.InterfaceAudience;
+import org.eclipse.jetty.http.HttpVersion;
+import org.eclipse.jetty.server.HttpConfiguration;
+import org.eclipse.jetty.server.HttpConnectionFactory;
+import org.eclipse.jetty.server.SecureRequestCustomizer;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.server.ServerConnector;
+import org.eclipse.jetty.server.SslConnectionFactory;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
+import org.eclipse.jetty.util.ssl.SslContextFactory;
+import org.eclipse.jetty.util.thread.QueuedThreadPool;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
+import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLineParser;
 import org.apache.hbase.thirdparty.org.apache.commons.cli.DefaultParser;
@@ -40,29 +156,36 @@ import org.apache.hbase.thirdparty.org.apache.commons.cli.Options;
  * independent process.
  */
 @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
-public class ThriftServer {
+public class ThriftServer  extends Configured implements Tool {
 
   private static final Logger LOG = LoggerFactory.getLogger(ThriftServer.class);
 
-  private static final String MIN_WORKERS_OPTION = "minWorkers";
-  private static final String MAX_WORKERS_OPTION = "workers";
-  private static final String MAX_QUEUE_SIZE_OPTION = "queue";
-  private static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
-  static final String BIND_OPTION = "bind";
-  static final String COMPACT_OPTION = "compact";
-  static final String FRAMED_OPTION = "framed";
-  static final String PORT_OPTION = "port";
-  static final String INFOPORT_OPTION = "infoport";
 
-  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
-  private static final int DEFAULT_LISTEN_PORT = 9090;
 
-  private Configuration conf;
-  ThriftServerRunner serverRunner;
+  protected Configuration conf;
+
+  protected InfoServer infoServer;
+
+  protected TProcessor processor;
+
+  protected ThriftMetrics metrics;
+  protected HBaseServiceHandler hbaseServiceHandler;
+  protected UserGroupInformation serviceUGI;
+  protected boolean httpEnabled;
+
+  protected SaslUtil.QualityOfProtection qop;
+  protected String host;
+  protected int listenPort;
+
+
+  protected boolean securityEnabled;
+  protected boolean doAsEnabled;
+
+  protected JvmPauseMonitor pauseMonitor;
 
-  private InfoServer infoServer;
+  protected volatile TServer tserver;
+  protected volatile Server httpServer;
 
-  private static final String READ_TIMEOUT_OPTION = "readTimeout";
 
   //
   // Main program and support routines
@@ -72,7 +195,89 @@ public class ThriftServer {
     this.conf = HBaseConfiguration.create(conf);
   }
 
-  private static void printUsageAndExit(Options options, int exitCode)
+  protected void setupParamters() throws IOException {
+    // login the server principal (if using secure Hadoop)
+    UserProvider userProvider = UserProvider.instantiate(conf);
+    securityEnabled = userProvider.isHadoopSecurityEnabled()
+        && userProvider.isHBaseSecurityEnabled();
+    if (securityEnabled) {
+      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+          conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
+          conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
+      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
+    }
+    this.serviceUGI = userProvider.getCurrent().getUGI();
+
+    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+    this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
+    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
+    this.hbaseServiceHandler = createHandler(conf, userProvider);
+    this.hbaseServiceHandler.initMetrics(metrics);
+    this.processor = createProcessor();
+
+    httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
+    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
+    if (doAsEnabled && !httpEnabled) {
+      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
+    }
+
+    String strQop = conf.get(THRIFT_QOP_KEY);
+    if (strQop != null) {
+      this.qop = SaslUtil.getQop(strQop);
+    }
+    if (qop != null) {
+      if (qop != SaslUtil.QualityOfProtection.AUTHENTICATION &&
+          qop != SaslUtil.QualityOfProtection.INTEGRITY &&
+          qop != SaslUtil.QualityOfProtection.PRIVACY) {
+        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
+            THRIFT_QOP_KEY,
+            SaslUtil.QualityOfProtection.AUTHENTICATION.name(),
+            SaslUtil.QualityOfProtection.INTEGRITY.name(),
+            SaslUtil.QualityOfProtection.PRIVACY.name()));
+      }
+      checkHttpSecurity(qop, conf);
+      if (!securityEnabled) {
+        throw new IOException("Thrift server must run in secure mode to support authentication");
+      }
+    }
+    registerFilters(conf);
+    pauseMonitor.start();
+  }
+
+  protected void startInfoServer() throws IOException {
+    // Put up info server.
+    int port = conf.getInt(THRIFT_INFO_SERVER_PORT , THRIFT_INFO_SERVER_PORT_DEFAULT);
+
+    if (port >= 0) {
+      conf.setLong("startcode", System.currentTimeMillis());
+      String a = conf
+          .get(THRIFT_INFO_SERVER_BINDING_ADDRESS, THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT);
+      infoServer = new InfoServer("thrift", a, port, false, conf);
+      infoServer.setAttribute("hbase.conf", conf);
+      infoServer.start();
+    }
+  }
+
+  protected void checkHttpSecurity(SaslUtil.QualityOfProtection qop, Configuration conf) {
+    if (qop == SaslUtil.QualityOfProtection.PRIVACY &&
+        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
+        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
+          THRIFT_SSL_ENABLED_KEY + " is false");
+    }
+  }
+
+  protected HBaseServiceHandler createHandler(Configuration conf, UserProvider userProvider)
+      throws IOException {
+    return new ThriftHBaseServiceHandler(conf, userProvider);
+  }
+
+  protected TProcessor createProcessor() {
+    return new Hbase.Processor<>(
+        HbaseHandlerMetricsProxy.newInstance((Hbase.Iface) hbaseServiceHandler, metrics, conf));
+  }
+
+  protected void printUsageAndExit(Options options, int exitCode)
       throws ExitCodeException {
     HelpFormatter formatter = new HelpFormatter();
     formatter.printHelp("Thrift", null, options,
@@ -85,32 +290,333 @@ public class ThriftServer {
   }
 
   /**
-   * Start up or shuts down the Thrift server, depending on the arguments.
-   * @param args the arguments to pass in when starting the Thrift server
+   * Setup a HTTP Server using Jetty to serve calls from THttpClient
+   *
+   * @throws IOException IOException
    */
-  void doMain(final String[] args) throws Exception {
-    processOptions(args);
-    serverRunner = new ThriftServerRunner(conf);
+  protected void setupHTTPServer() throws IOException {
+    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
+    TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
+        conf, hbaseServiceHandler, securityEnabled, doAsEnabled);
+
+    // Set the default max thread number to 100 to limit
+    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
+    // Jetty set the default max thread number to 250, if we don't set it.
+    //
+    // Our default min thread number 2 is the same as that used by Jetty.
+    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY,
+        conf.getInt(TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY,
+            HTTP_MIN_THREADS_KEY_DEFAULT));
+    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY,
+        conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+            HTTP_MAX_THREADS_KEY_DEFAULT));
+    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
+    threadPool.setMinThreads(minThreads);
+    httpServer = new Server(threadPool);
+
+    // Context handler
+    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
+        ServletContextHandler.SESSIONS);
+    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
+    HttpServerUtil.constrainHttpMethods(ctxHandler,
+        conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD,
+            THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
+
+    // set up Jetty and run the embedded server
+    HttpConfiguration httpConfig = new HttpConfiguration();
+    httpConfig.setSecureScheme("https");
+    httpConfig.setSecurePort(listenPort);
+    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
+    httpConfig.setSendServerVersion(false);
+    httpConfig.setSendDateHeader(false);
+
+    ServerConnector serverConnector;
+    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
+      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
+      httpsConfig.addCustomizer(new SecureRequestCustomizer());
+
+      SslContextFactory sslCtxFactory = new SslContextFactory();
+      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
+      String password = HBaseConfiguration.getPassword(conf,
+          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
+      String keyPassword = HBaseConfiguration.getPassword(conf,
+          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
+      sslCtxFactory.setKeyStorePath(keystore);
+      sslCtxFactory.setKeyStorePassword(password);
+      sslCtxFactory.setKeyManagerPassword(keyPassword);
+
+      String[] excludeCiphers = conf.getStrings(
+          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+      if (excludeCiphers.length != 0) {
+        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
+      }
+      String[] includeCiphers = conf.getStrings(
+          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+      if (includeCiphers.length != 0) {
+        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
+      }
 
-    // Put up info server.
-    int port = conf.getInt("hbase.thrift.info.port", 9095);
+      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
+      String[] excludeProtocols = conf.getStrings(
+          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
+      if (excludeProtocols.length != 0) {
+        sslCtxFactory.setExcludeProtocols(excludeProtocols);
+      }
+      String[] includeProtocols = conf.getStrings(
+          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
+      if (includeProtocols.length != 0) {
+        sslCtxFactory.setIncludeProtocols(includeProtocols);
+      }
 
-    if (port >= 0) {
-      conf.setLong("startcode", System.currentTimeMillis());
-      String a = conf.get("hbase.thrift.info.bindAddress", "0.0.0.0");
-      infoServer = new InfoServer("thrift", a, port, false, conf);
-      infoServer.setAttribute("hbase.conf", conf);
-      infoServer.start();
+      serverConnector = new ServerConnector(httpServer,
+          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
+          new HttpConnectionFactory(httpsConfig));
+    } else {
+      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
     }
+    serverConnector.setPort(listenPort);
+    serverConnector.setHost(getBindAddress(conf).getHostAddress());
+    httpServer.addConnector(serverConnector);
+    httpServer.setStopAtShutdown(true);
 
-    serverRunner.run();
+    if (doAsEnabled) {
+      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
+    }
+    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
   }
 
   /**
-   * Parse the command line options to set parameters the conf.
+   * Setting up the thrift TServer
    */
-  private void processOptions(final String[] args) throws Exception {
-    Options options = new Options();
+  protected void setupServer() throws Exception {
+    // Construct correct ProtocolFactory
+    TProtocolFactory protocolFactory = getProtocolFactory();
+
+    ImplType implType = ImplType.getServerImpl(conf);
+    TProcessor processorToUse = processor;
+
+    // Construct correct TransportFactory
+    TTransportFactory transportFactory;
+    if (conf.getBoolean(FRAMED_CONF_KEY, FRAMED_CONF_DEFAULT) || implType.isAlwaysFramed) {
+      if (qop != null) {
+        throw new RuntimeException("Thrift server authentication"
+            + " doesn't work with framed transport yet");
+      }
+      transportFactory = new TFramedTransport.Factory(
+          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, MAX_FRAME_SIZE_CONF_DEFAULT) * 1024 * 1024);
+      LOG.debug("Using framed transport");
+    } else if (qop == null) {
+      transportFactory = new TTransportFactory();
+    } else {
+      // Extract the name from the principal
+      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
+      if (thriftKerberosPrincipal == null) {
+        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
+      }
+      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
+      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
+      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
+      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
+          new SaslRpcServer.SaslGssCallbackHandler() {
+            @Override
+            public void handle(Callback[] callbacks)
+                throws UnsupportedCallbackException {
+              AuthorizeCallback ac = null;
+              for (Callback callback : callbacks) {
+                if (callback instanceof AuthorizeCallback) {
+                  ac = (AuthorizeCallback) callback;
+                } else {
+                  throw new UnsupportedCallbackException(callback,
+                      "Unrecognized SASL GSSAPI Callback");
+                }
+              }
+              if (ac != null) {
+                String authid = ac.getAuthenticationID();
+                String authzid = ac.getAuthorizationID();
+                if (!authid.equals(authzid)) {
+                  ac.setAuthorized(false);
+                } else {
+                  ac.setAuthorized(true);
+                  String userName = SecurityUtil.getUserFromPrincipal(authzid);
+                  LOG.info("Effective user: {}", userName);
+                  ac.setAuthorizedID(userName);
+                }
+              }
+            }
+          });
+      transportFactory = saslFactory;
+
+      // Create a processor wrapper, to get the caller
+      processorToUse = (inProt, outProt) -> {
+        TSaslServerTransport saslServerTransport =
+            (TSaslServerTransport)inProt.getTransport();
+        SaslServer saslServer = saslServerTransport.getSaslServer();
+        String principal = saslServer.getAuthorizationID();
+        hbaseServiceHandler.setEffectiveUser(principal);
+        return processor.process(inProt, outProt);
+      };
+    }
+
+    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+      LOG.error("Server types {} don't support IP address binding at the moment. See " +
+              "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
+          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
+      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
+    }
+
+    InetSocketAddress inetSocketAddress = new InetSocketAddress(getBindAddress(conf), listenPort);
+    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+        implType == ImplType.THREADED_SELECTOR) {
+      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(inetSocketAddress);
+      if (implType == ImplType.NONBLOCKING) {
+        tserver = getTNonBlockingServer(serverTransport, protocolFactory, processorToUse,
+            transportFactory, inetSocketAddress);
+      } else if (implType == ImplType.HS_HA) {
+        tserver = getTHsHaServer(serverTransport, protocolFactory, processorToUse, transportFactory,
+            inetSocketAddress);
+      } else { // THREADED_SELECTOR
+        tserver = getTThreadedSelectorServer(serverTransport, protocolFactory, processorToUse,
+            transportFactory, inetSocketAddress);
+      }
+      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
+          Integer.toString(listenPort));
+    } else if (implType == ImplType.THREAD_POOL) {
+      this.tserver = getTThreadPoolServer(protocolFactory, processorToUse, transportFactory,
+          inetSocketAddress);
+    } else {
+      throw new AssertionError("Unsupported Thrift server implementation: " +
+          implType.simpleClassName());
+    }
+
+    // A sanity check that we instantiated the right type of server.
+    if (tserver.getClass() != implType.serverClass) {
+      throw new AssertionError("Expected to create Thrift server class " +
+          implType.serverClass.getName() + " but got " +
+          tserver.getClass().getName());
+    }
+  }
+
+  private TServer getTNonBlockingServer(TNonblockingServerTransport serverTransport,
+      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+      InetSocketAddress inetSocketAddress) {
+    LOG.info("starting HBase Nonblocking Thrift server on " + inetSocketAddress.toString());
+    TNonblockingServer.Args serverArgs = new TNonblockingServer.Args(serverTransport);
+    serverArgs.processor(processor);
+    serverArgs.transportFactory(transportFactory);
+    serverArgs.protocolFactory(protocolFactory);
+    return new TNonblockingServer(serverArgs);
+  }
+
+  private TServer getTHsHaServer(TNonblockingServerTransport serverTransport,
+      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+      InetSocketAddress inetSocketAddress) {
+    LOG.info("starting HBase HsHA Thrift server on " + inetSocketAddress.toString());
+    THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+    int workerThread = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+        serverArgs.getMaxWorkerThreads());
+    ExecutorService executorService = createExecutor(
+        callQueue, workerThread, workerThread);
+    serverArgs.executorService(executorService).processor(processor)
+        .transportFactory(transportFactory).protocolFactory(protocolFactory);
+    return new THsHaServer(serverArgs);
+  }
+
+  private TServer getTThreadedSelectorServer(TNonblockingServerTransport serverTransport,
+      TProtocolFactory protocolFactory, TProcessor processor, TTransportFactory transportFactory,
+      InetSocketAddress inetSocketAddress) {
+    LOG.info("starting HBase ThreadedSelector Thrift server on " + inetSocketAddress.toString());
+    TThreadedSelectorServer.Args serverArgs =
+        new HThreadedSelectorServerArgs(serverTransport, conf);
+    int queueSize = conf.getInt(TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY,
+        TBoundedThreadPoolServer.DEFAULT_MAX_QUEUED_REQUESTS);
+    CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(queueSize), metrics);
+    int workerThreads = conf.getInt(TBoundedThreadPoolServer.MAX_WORKER_THREADS_CONF_KEY,
+        serverArgs.getWorkerThreads());
+    int selectorThreads = conf.getInt(THRIFT_SELECTOR_NUM, serverArgs.getSelectorThreads());
+    serverArgs.selectorThreads(selectorThreads);
+    ExecutorService executorService = createExecutor(
+        callQueue, workerThreads, workerThreads);
+    serverArgs.executorService(executorService).processor(processor)
+        .transportFactory(transportFactory).protocolFactory(protocolFactory);
+    return new TThreadedSelectorServer(serverArgs);
+  }
+
+  private TServer getTThreadPoolServer(TProtocolFactory protocolFactory, TProcessor processor,
+      TTransportFactory transportFactory, InetSocketAddress inetSocketAddress) throws Exception {
+    LOG.info("starting HBase ThreadPool Thrift server on " + inetSocketAddress.toString());
+    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
+    int backlog = conf.getInt(BACKLOG_CONF_KEY, BACKLOG_CONF_DEAFULT);
+    int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
+        THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
+    TServerTransport serverTransport = new TServerSocket(
+        new TServerSocket.ServerSocketTransportArgs().
+            bindAddr(inetSocketAddress).backlog(backlog).
+            clientTimeout(readTimeout));
+
+    TBoundedThreadPoolServer.Args serverArgs =
+        new TBoundedThreadPoolServer.Args(serverTransport, conf);
+    serverArgs.processor(processor).transportFactory(transportFactory)
+        .protocolFactory(protocolFactory);
+    return new TBoundedThreadPoolServer(serverArgs, metrics);
+  }
+
+  private TProtocolFactory getProtocolFactory() {
+    TProtocolFactory protocolFactory;
+
+    if (conf.getBoolean(COMPACT_CONF_KEY, COMPACT_CONF_DEFAULT)) {
+      LOG.debug("Using compact protocol");
+      protocolFactory = new TCompactProtocol.Factory();
+    } else {
+      LOG.debug("Using binary protocol");
+      protocolFactory = new TBinaryProtocol.Factory();
+    }
+
+    return protocolFactory;
+  }
+
+  ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
+      int minWorkers, int maxWorkers) {
+    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
+    tfb.setDaemon(true);
+    tfb.setNameFormat("thrift-worker-%d");
+    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
+        Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
+    threadPool.allowCoreThreadTimeOut(true);
+    return threadPool;
+  }
+
+  private InetAddress getBindAddress(Configuration conf)
+      throws UnknownHostException {
+    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+    return InetAddress.getByName(bindAddressStr);
+  }
+
+
+  public static void registerFilters(Configuration conf) {
+    String[] filters = conf.getStrings(THRIFT_FILTERS);
+    Splitter splitter = Splitter.on(':');
+    if(filters != null) {
+      for(String filterClass: filters) {
+        List<String> filterPart = splitter.splitToList(filterClass);
+        if(filterPart.size() != 2) {
+          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
+        } else {
+          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
+        }
+      }
+    }
+  }
+
+  /**
+   * Add options to command lines
+   * @param options options
+   */
+  protected void addOptions(Options options) {
     options.addOption("b", BIND_OPTION, true, "Address to bind " +
         "the Thrift server to. [default: " + DEFAULT_BIND_ADDR + "]");
     options.addOption("p", PORT_OPTION, true, "Port to bind to [default: " +
@@ -118,62 +624,56 @@ public class ThriftServer {
     options.addOption("f", FRAMED_OPTION, false, "Use framed transport");
     options.addOption("c", COMPACT_OPTION, false, "Use the compact protocol");
     options.addOption("h", "help", false, "Print help information");
+    options.addOption("s", SELECTOR_NUM_OPTION, true, "How many selector threads to use.");
     options.addOption(null, INFOPORT_OPTION, true, "Port for web UI");
 
     options.addOption("m", MIN_WORKERS_OPTION, true,
         "The minimum number of worker threads for " +
-        ImplType.THREAD_POOL.simpleClassName());
+            ImplType.THREAD_POOL.simpleClassName());
 
     options.addOption("w", MAX_WORKERS_OPTION, true,
         "The maximum number of worker threads for " +
-        ImplType.THREAD_POOL.simpleClassName());
+            ImplType.THREAD_POOL.simpleClassName());
 
     options.addOption("q", MAX_QUEUE_SIZE_OPTION, true,
         "The maximum number of queued requests in " +
-        ImplType.THREAD_POOL.simpleClassName());
+            ImplType.THREAD_POOL.simpleClassName());
 
     options.addOption("k", KEEP_ALIVE_SEC_OPTION, true,
         "The amount of time in secods to keep a thread alive when idle in " +
-        ImplType.THREAD_POOL.simpleClassName());
+            ImplType.THREAD_POOL.simpleClassName());
 
     options.addOption("t", READ_TIMEOUT_OPTION, true,
         "Amount of time in milliseconds before a server thread will timeout " +
-        "waiting for client to send data on a connected socket. Currently, " +
-        "only applies to TBoundedThreadPoolServer");
+            "waiting for client to send data on a connected socket. Currently, " +
+            "only applies to TBoundedThreadPoolServer");
 
     options.addOptionGroup(ImplType.createOptionGroup());
+  }
 
-    CommandLineParser parser = new DefaultParser();
-    CommandLine cmd = parser.parse(options, args);
-
-    if (cmd.hasOption("help")) {
-      printUsageAndExit(options, 1);
-    }
-
+  protected void parseCommandLine(CommandLine cmd, Options options) throws ExitCodeException {
     // Get port to bind to
     try {
       if (cmd.hasOption(PORT_OPTION)) {
         int listenPort = Integer.parseInt(cmd.getOptionValue(PORT_OPTION));
-        conf.setInt(ThriftServerRunner.PORT_CONF_KEY, listenPort);
+        conf.setInt(PORT_CONF_KEY, listenPort);
       }
     } catch (NumberFormatException e) {
       LOG.error("Could not parse the value provided for the port option", e);
       printUsageAndExit(options, -1);
     }
-
     // check for user-defined info server port setting, if so override the conf
     try {
       if (cmd.hasOption(INFOPORT_OPTION)) {
         String val = cmd.getOptionValue(INFOPORT_OPTION);
-        conf.setInt("hbase.thrift.info.port", Integer.parseInt(val));
+        conf.setInt(THRIFT_INFO_SERVER_PORT, Integer.parseInt(val));
         LOG.debug("Web UI port set to " + val);
       }
     } catch (NumberFormatException e) {
       LOG.error("Could not parse the value provided for the " + INFOPORT_OPTION +
-        " option", e);
+          " option", e);
       printUsageAndExit(options, -1);
     }
-
     // Make optional changes to the configuration based on command-line options
     optionToConf(cmd, MIN_WORKERS_OPTION,
         conf, TBoundedThreadPoolServer.MIN_WORKER_THREADS_CONF_KEY);
@@ -183,23 +683,42 @@ public class ThriftServer {
         conf, TBoundedThreadPoolServer.MAX_QUEUED_REQUESTS_CONF_KEY);
     optionToConf(cmd, KEEP_ALIVE_SEC_OPTION,
         conf, TBoundedThreadPoolServer.THREAD_KEEP_ALIVE_TIME_SEC_CONF_KEY);
-    optionToConf(cmd, READ_TIMEOUT_OPTION, conf,
-        ThriftServerRunner.THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+    optionToConf(cmd, READ_TIMEOUT_OPTION, conf, THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY);
+    optionToConf(cmd, SELECTOR_NUM_OPTION, conf, THRIFT_SELECTOR_NUM);
 
     // Set general thrift server options
     boolean compact = cmd.hasOption(COMPACT_OPTION) ||
-      conf.getBoolean(ThriftServerRunner.COMPACT_CONF_KEY, false);
-    conf.setBoolean(ThriftServerRunner.COMPACT_CONF_KEY, compact);
+        conf.getBoolean(COMPACT_CONF_KEY, false);
+    conf.setBoolean(COMPACT_CONF_KEY, compact);
     boolean framed = cmd.hasOption(FRAMED_OPTION) ||
-      conf.getBoolean(ThriftServerRunner.FRAMED_CONF_KEY, false);
-    conf.setBoolean(ThriftServerRunner.FRAMED_CONF_KEY, framed);
-    if (cmd.hasOption(BIND_OPTION)) {
-      conf.set(ThriftServerRunner.BIND_CONF_KEY, cmd.getOptionValue(BIND_OPTION));
-    }
+        conf.getBoolean(FRAMED_CONF_KEY, false);
+    conf.setBoolean(FRAMED_CONF_KEY, framed);
+
+    optionToConf(cmd, BIND_OPTION, conf, BIND_CONF_KEY);
+
 
     ImplType.setServerImpl(cmd, conf);
   }
 
+  /**
+   * Parse the command line options to set parameters the conf.
+   */
+  private void processOptions(final String[] args) throws Exception {
+    if (args == null || args.length == 0) {
+      return;
+    }
+    Options options = new Options();
+    addOptions(options);
+
+    CommandLineParser parser = new DefaultParser();
+    CommandLine cmd = parser.parse(options, args);
+
+    if (cmd.hasOption("help")) {
+      printUsageAndExit(options, 1);
+    }
+    parseCommandLine(cmd, options);
+  }
+
   public void stop() {
     if (this.infoServer != null) {
       LOG.info("Stopping infoServer");
@@ -209,10 +728,25 @@ public class ThriftServer {
         LOG.error("Failed to stop infoServer", ex);
       }
     }
-    serverRunner.shutdown();
+    if (pauseMonitor != null) {
+      pauseMonitor.stop();
+    }
+    if (tserver != null) {
+      tserver.stop();
+      tserver = null;
+    }
+    if (httpServer != null) {
+      try {
+        httpServer.stop();
+        httpServer = null;
+      } catch (Exception e) {
+        LOG.error("Problem encountered in shutting down HTTP server", e);
+      }
+      httpServer = null;
+    }
   }
 
-  private static void optionToConf(CommandLine cmd, String option,
+  protected static void optionToConf(CommandLine cmd, String option,
       Configuration conf, String destConfKey) {
     if (cmd.hasOption(option)) {
       String value = cmd.getOptionValue(option);
@@ -221,16 +755,38 @@ public class ThriftServer {
     }
   }
 
+  /**
+   * Run without any command line arguments
+   * @return exit code
+   * @throws Exception exception
+   */
+  public int run() throws Exception {
+    return run(null);
+  }
+
+  @Override
+  public int run(String[] strings) throws Exception {
+    processOptions(strings);
+    setupParamters();
+    startInfoServer();
+    if (httpEnabled) {
+      setupHTTPServer();
+      httpServer.start();
+      httpServer.join();
+    } else {
+      setupServer();
+      tserver.serve();
+    }
+    return 0;
+  }
+
   public static void main(String [] args) throws Exception {
     LOG.info("***** STARTING service '" + ThriftServer.class.getSimpleName() + "' *****");
     VersionInfo.logVersion();
-    int exitCode = 0;
-    try {
-      new ThriftServer(HBaseConfiguration.create()).doMain(args);
-    } catch (ExitCodeException ex) {
-      exitCode = ex.getExitCode();
-    }
+    final Configuration conf = HBaseConfiguration.create();
+    // for now, only time we return is on an argument error.
+    final int status = ToolRunner.run(conf, new ThriftServer(conf), args);
     LOG.info("***** STOPPING service '" + ThriftServer.class.getSimpleName() + "' *****");
-    System.exit(exitCode);
+    System.exit(status);
   }
 }