You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by al...@apache.org on 2019/01/02 08:14:04 UTC

[2/4] hbase git commit: HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
deleted file mode 100644
index 5e248f1..0000000
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
+++ /dev/null
@@ -1,2031 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.thrift;
-
-import static org.apache.hadoop.hbase.util.Bytes.getBytes;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-import javax.security.auth.callback.Callback;
-import javax.security.auth.callback.UnsupportedCallbackException;
-import javax.security.sasl.AuthorizeCallback;
-import javax.security.sasl.SaslServer;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilder;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.MetaTableAccessor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Append;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Durability;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Increment;
-import org.apache.hadoop.hbase.client.OperationWithAttributes;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.filter.Filter;
-import org.apache.hadoop.hbase.filter.ParseFilter;
-import org.apache.hadoop.hbase.filter.PrefixFilter;
-import org.apache.hadoop.hbase.filter.WhileMatchFilter;
-import org.apache.hadoop.hbase.http.HttpServerUtil;
-import org.apache.hadoop.hbase.log.HBaseMarkers;
-import org.apache.hadoop.hbase.security.SaslUtil;
-import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
-import org.apache.hadoop.hbase.security.SecurityUtil;
-import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
-import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
-import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
-import org.apache.hadoop.hbase.thrift.generated.Hbase;
-import org.apache.hadoop.hbase.thrift.generated.IOError;
-import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
-import org.apache.hadoop.hbase.thrift.generated.Mutation;
-import org.apache.hadoop.hbase.thrift.generated.TAppend;
-import org.apache.hadoop.hbase.thrift.generated.TCell;
-import org.apache.hadoop.hbase.thrift.generated.TIncrement;
-import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
-import org.apache.hadoop.hbase.thrift.generated.TRowResult;
-import org.apache.hadoop.hbase.thrift.generated.TScan;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ConnectionCache;
-import org.apache.hadoop.hbase.util.DNS;
-import org.apache.hadoop.hbase.util.JvmPauseMonitor;
-import org.apache.hadoop.hbase.util.Strings;
-import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.ProxyUsers;
-import org.apache.thrift.TException;
-import org.apache.thrift.TProcessor;
-import org.apache.thrift.protocol.TBinaryProtocol;
-import org.apache.thrift.protocol.TCompactProtocol;
-import org.apache.thrift.protocol.TProtocolFactory;
-import org.apache.thrift.server.THsHaServer;
-import org.apache.thrift.server.TNonblockingServer;
-import org.apache.thrift.server.TServer;
-import org.apache.thrift.server.TServlet;
-import org.apache.thrift.server.TThreadedSelectorServer;
-import org.apache.thrift.transport.TFramedTransport;
-import org.apache.thrift.transport.TNonblockingServerSocket;
-import org.apache.thrift.transport.TNonblockingServerTransport;
-import org.apache.thrift.transport.TSaslServerTransport;
-import org.apache.thrift.transport.TServerSocket;
-import org.apache.thrift.transport.TServerTransport;
-import org.apache.thrift.transport.TTransportFactory;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.eclipse.jetty.http.HttpVersion;
-import org.eclipse.jetty.server.HttpConfiguration;
-import org.eclipse.jetty.server.HttpConnectionFactory;
-import org.eclipse.jetty.server.SecureRequestCustomizer;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.server.ServerConnector;
-import org.eclipse.jetty.server.SslConnectionFactory;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-import org.eclipse.jetty.util.ssl.SslContextFactory;
-import org.eclipse.jetty.util.thread.QueuedThreadPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
-import org.apache.hbase.thirdparty.com.google.common.base.Splitter;
-import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
-import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
-
-/**
- * ThriftServerRunner - this class starts up a Thrift server which implements
- * the Hbase API specified in the Hbase.thrift IDL file.
- */
-@InterfaceAudience.Private
-public class ThriftServerRunner implements Runnable {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ThriftServerRunner.class);
-
-  private static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
-
-  static final String SERVER_TYPE_CONF_KEY =
-      "hbase.regionserver.thrift.server.type";
-
-  static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
-  static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
-  static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
-  static final String MAX_FRAME_SIZE_CONF_KEY =
-          "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
-  static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
-  static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
-  static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
-  static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
-  static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
-
-  static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
-  static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
-  static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY = "hbase.thrift.ssl.keystore.password";
-  static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY = "hbase.thrift.ssl.keystore.keypassword";
-  static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
-      "hbase.thrift.ssl.exclude.cipher.suites";
-  static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
-      "hbase.thrift.ssl.include.cipher.suites";
-  static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.exclude.protocols";
-  static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY = "hbase.thrift.ssl.include.protocols";
-
-  static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
-
-  static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
-  static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
-  static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
-  static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
-  static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
-  static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
-
-  /**
-   * Amount of time in milliseconds before a server thread will timeout
-   * waiting for client to send data on a connected socket. Currently,
-   * applies only to TBoundedThreadPoolServer
-   */
-  public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
-    "hbase.thrift.server.socket.read.timeout";
-  public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
-
-
-  /**
-   * Thrift quality of protection configuration key. Valid values can be:
-   * auth-conf: authentication, integrity and confidentiality checking
-   * auth-int: authentication and integrity checking
-   * auth: authentication only
-   *
-   * This is used to authenticate the callers and support impersonation.
-   * The thrift server and the HBase cluster must run in secure mode.
-   */
-  static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
-  static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
-
-  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
-  public static final int DEFAULT_LISTEN_PORT = 9090;
-  public static final int HREGION_VERSION = 1;
-
-  private final int listenPort;
-
-  private Configuration conf;
-  volatile TServer tserver;
-  volatile Server httpServer;
-  private final Hbase.Iface handler;
-  private final ThriftMetrics metrics;
-  private final HBaseHandler hbaseHandler;
-  private final UserGroupInformation serviceUGI;
-
-  private SaslUtil.QualityOfProtection qop;
-  private String host;
-
-  private final boolean securityEnabled;
-  private final boolean doAsEnabled;
-
-  private final JvmPauseMonitor pauseMonitor;
-
-  static String THRIFT_HTTP_ALLOW_OPTIONS_METHOD = "hbase.thrift.http.allow.options.method";
-  private static boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
-
-  /** An enum of server implementation selections */
-  public enum ImplType {
-    HS_HA("hsha", true, THsHaServer.class, true),
-    NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
-    THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
-    THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
-
-    public static final ImplType DEFAULT = THREAD_POOL;
-
-    final String option;
-    final boolean isAlwaysFramed;
-    final Class<? extends TServer> serverClass;
-    final boolean canSpecifyBindIP;
-
-    private ImplType(String option, boolean isAlwaysFramed,
-        Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
-      this.option = option;
-      this.isAlwaysFramed = isAlwaysFramed;
-      this.serverClass = serverClass;
-      this.canSpecifyBindIP = canSpecifyBindIP;
-    }
-
-    /**
-     * @return <code>-option</code>
-     */
-    @Override
-    public String toString() {
-      return "-" + option;
-    }
-
-    public String getOption() {
-      return option;
-    }
-
-    public boolean isAlwaysFramed() {
-      return isAlwaysFramed;
-    }
-
-    public String getDescription() {
-      StringBuilder sb = new StringBuilder("Use the " +
-          serverClass.getSimpleName());
-      if (isAlwaysFramed) {
-        sb.append(" This implies the framed transport.");
-      }
-      if (this == DEFAULT) {
-        sb.append("This is the default.");
-      }
-      return sb.toString();
-    }
-
-    static OptionGroup createOptionGroup() {
-      OptionGroup group = new OptionGroup();
-      for (ImplType t : values()) {
-        group.addOption(new Option(t.option, t.getDescription()));
-      }
-      return group;
-    }
-
-    public static ImplType getServerImpl(Configuration conf) {
-      String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
-      for (ImplType t : values()) {
-        if (confType.equals(t.option)) {
-          return t;
-        }
-      }
-      throw new AssertionError("Unknown server ImplType.option:" + confType);
-    }
-
-    static void setServerImpl(CommandLine cmd, Configuration conf) {
-      ImplType chosenType = null;
-      int numChosen = 0;
-      for (ImplType t : values()) {
-        if (cmd.hasOption(t.option)) {
-          chosenType = t;
-          ++numChosen;
-        }
-      }
-      if (numChosen < 1) {
-        LOG.info("Using default thrift server type");
-        chosenType = DEFAULT;
-      } else if (numChosen > 1) {
-        throw new AssertionError("Exactly one option out of " +
-          Arrays.toString(values()) + " has to be specified");
-      }
-      LOG.info("Using thrift server type " + chosenType.option);
-      conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
-    }
-
-    public String simpleClassName() {
-      return serverClass.getSimpleName();
-    }
-
-    public static List<String> serversThatCannotSpecifyBindIP() {
-      List<String> l = new ArrayList<>();
-      for (ImplType t : values()) {
-        if (!t.canSpecifyBindIP) {
-          l.add(t.simpleClassName());
-        }
-      }
-      return l;
-    }
-  }
-
-  public ThriftServerRunner(Configuration conf) throws IOException {
-    // login the server principal (if using secure Hadoop)
-    UserProvider userProvider = UserProvider.instantiate(conf);
-    securityEnabled = userProvider.isHadoopSecurityEnabled()
-        && userProvider.isHBaseSecurityEnabled();
-    if (securityEnabled) {
-      host = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
-        conf.get(THRIFT_DNS_INTERFACE_KEY, "default"),
-        conf.get(THRIFT_DNS_NAMESERVER_KEY, "default")));
-      userProvider.login(THRIFT_KEYTAB_FILE_KEY, THRIFT_KERBEROS_PRINCIPAL_KEY, host);
-    }
-    this.serviceUGI = userProvider.getCurrent().getUGI();
-
-    this.conf = HBaseConfiguration.create(conf);
-    this.listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
-    this.metrics = new ThriftMetrics(conf, ThriftMetrics.ThriftServerType.ONE);
-    this.pauseMonitor = new JvmPauseMonitor(conf, this.metrics.getSource());
-    this.hbaseHandler = new HBaseHandler(conf, userProvider);
-    this.hbaseHandler.initMetrics(metrics);
-    this.handler = HbaseHandlerMetricsProxy.newInstance(hbaseHandler, metrics, conf);
-
-    boolean httpEnabled = conf.getBoolean(USE_HTTP_CONF_KEY, false);
-    doAsEnabled = conf.getBoolean(THRIFT_SUPPORT_PROXYUSER_KEY, false);
-    if (doAsEnabled && !httpEnabled) {
-      LOG.warn("Fail to enable the doAs feature. " + USE_HTTP_CONF_KEY + " is not configured");
-    }
-
-    String strQop = conf.get(THRIFT_QOP_KEY);
-    if (strQop != null) {
-      this.qop = SaslUtil.getQop(strQop);
-    }
-    if (qop != null) {
-      if (qop != QualityOfProtection.AUTHENTICATION &&
-          qop != QualityOfProtection.INTEGRITY &&
-          qop != QualityOfProtection.PRIVACY) {
-        throw new IOException(String.format("Invalid %s: It must be one of %s, %s, or %s.",
-                              THRIFT_QOP_KEY,
-                              QualityOfProtection.AUTHENTICATION.name(),
-                              QualityOfProtection.INTEGRITY.name(),
-                              QualityOfProtection.PRIVACY.name()));
-      }
-      checkHttpSecurity(qop, conf);
-      if (!securityEnabled) {
-        throw new IOException("Thrift server must run in secure mode to support authentication");
-      }
-    }
-  }
-
-  private void checkHttpSecurity(QualityOfProtection qop, Configuration conf) {
-    if (qop == QualityOfProtection.PRIVACY &&
-        conf.getBoolean(USE_HTTP_CONF_KEY, false) &&
-        !conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
-      throw new IllegalArgumentException("Thrift HTTP Server's QoP is privacy, but " +
-          THRIFT_SSL_ENABLED_KEY + " is false");
-    }
-  }
-
-  /*
-   * Runs the Thrift server
-   */
-  @Override
-  public void run() {
-    serviceUGI.doAs(new PrivilegedAction<Object>() {
-      @Override
-      public Object run() {
-        try {
-          pauseMonitor.start();
-          if (conf.getBoolean(USE_HTTP_CONF_KEY, false)) {
-            setupHTTPServer();
-            httpServer.start();
-            httpServer.join();
-          } else {
-            setupServer();
-            tserver.serve();
-          }
-        } catch (Exception e) {
-          LOG.error(HBaseMarkers.FATAL, "Cannot run ThriftServer", e);
-          // Crash the process if the ThriftServer is not running
-          System.exit(-1);
-        }
-        return null;
-      }
-    });
-
-  }
-
-  public void shutdown() {
-    if (pauseMonitor != null) {
-      pauseMonitor.stop();
-    }
-    if (tserver != null) {
-      tserver.stop();
-      tserver = null;
-    }
-    if (httpServer != null) {
-      try {
-        httpServer.stop();
-        httpServer = null;
-      } catch (Exception e) {
-        LOG.error("Problem encountered in shutting down HTTP server", e);
-      }
-      httpServer = null;
-    }
-  }
-
-  private void setupHTTPServer() throws IOException {
-    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
-    TProcessor processor = new Hbase.Processor<>(handler);
-    TServlet thriftHttpServlet = new ThriftHttpServlet(processor, protocolFactory, serviceUGI,
-        conf, hbaseHandler, securityEnabled, doAsEnabled);
-
-    // Set the default max thread number to 100 to limit
-    // the number of concurrent requests so that Thrfit HTTP server doesn't OOM easily.
-    // Jetty set the default max thread number to 250, if we don't set it.
-    //
-    // Our default min thread number 2 is the same as that used by Jetty.
-    int minThreads = conf.getInt(HTTP_MIN_THREADS_KEY, 2);
-    int maxThreads = conf.getInt(HTTP_MAX_THREADS_KEY, 100);
-    QueuedThreadPool threadPool = new QueuedThreadPool(maxThreads);
-    threadPool.setMinThreads(minThreads);
-    httpServer = new Server(threadPool);
-
-    // Context handler
-    ServletContextHandler ctxHandler = new ServletContextHandler(httpServer, "/",
-            ServletContextHandler.SESSIONS);
-    ctxHandler.addServlet(new ServletHolder(thriftHttpServlet), "/*");
-    HttpServerUtil.constrainHttpMethods(ctxHandler,
-      conf.getBoolean(THRIFT_HTTP_ALLOW_OPTIONS_METHOD, THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT));
-
-    // set up Jetty and run the embedded server
-    HttpConfiguration httpConfig = new HttpConfiguration();
-    httpConfig.setSecureScheme("https");
-    httpConfig.setSecurePort(listenPort);
-    httpConfig.setHeaderCacheSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
-    httpConfig.setRequestHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
-    httpConfig.setResponseHeaderSize(DEFAULT_HTTP_MAX_HEADER_SIZE);
-    httpConfig.setSendServerVersion(false);
-    httpConfig.setSendDateHeader(false);
-
-    ServerConnector serverConnector;
-    if(conf.getBoolean(THRIFT_SSL_ENABLED_KEY, false)) {
-      HttpConfiguration httpsConfig = new HttpConfiguration(httpConfig);
-      httpsConfig.addCustomizer(new SecureRequestCustomizer());
-
-      SslContextFactory sslCtxFactory = new SslContextFactory();
-      String keystore = conf.get(THRIFT_SSL_KEYSTORE_STORE_KEY);
-      String password = HBaseConfiguration.getPassword(conf,
-          THRIFT_SSL_KEYSTORE_PASSWORD_KEY, null);
-      String keyPassword = HBaseConfiguration.getPassword(conf,
-          THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY, password);
-      sslCtxFactory.setKeyStorePath(keystore);
-      sslCtxFactory.setKeyStorePassword(password);
-      sslCtxFactory.setKeyManagerPassword(keyPassword);
-
-      String[] excludeCiphers = conf.getStrings(
-          THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
-      if (excludeCiphers.length != 0) {
-        sslCtxFactory.setExcludeCipherSuites(excludeCiphers);
-      }
-      String[] includeCiphers = conf.getStrings(
-          THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
-      if (includeCiphers.length != 0) {
-        sslCtxFactory.setIncludeCipherSuites(includeCiphers);
-      }
-
-      // Disable SSLv3 by default due to "Poodle" Vulnerability - CVE-2014-3566
-      String[] excludeProtocols = conf.getStrings(
-          THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY, "SSLv3");
-      if (excludeProtocols.length != 0) {
-        sslCtxFactory.setExcludeProtocols(excludeProtocols);
-      }
-      String[] includeProtocols = conf.getStrings(
-          THRIFT_SSL_INCLUDE_PROTOCOLS_KEY, ArrayUtils.EMPTY_STRING_ARRAY);
-      if (includeProtocols.length != 0) {
-        sslCtxFactory.setIncludeProtocols(includeProtocols);
-      }
-
-      serverConnector = new ServerConnector(httpServer,
-          new SslConnectionFactory(sslCtxFactory, HttpVersion.HTTP_1_1.toString()),
-          new HttpConnectionFactory(httpsConfig));
-    } else {
-      serverConnector = new ServerConnector(httpServer, new HttpConnectionFactory(httpConfig));
-    }
-    serverConnector.setPort(listenPort);
-    serverConnector.setHost(getBindAddress(conf).getHostAddress());
-    httpServer.addConnector(serverConnector);
-    httpServer.setStopAtShutdown(true);
-
-    if (doAsEnabled) {
-      ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
-    }
-
-    LOG.info("Starting Thrift HTTP Server on {}", Integer.toString(listenPort));
-  }
-
-  /**
-   * Setting up the thrift TServer
-   */
-  private void setupServer() throws Exception {
-    // Construct correct ProtocolFactory
-    TProtocolFactory protocolFactory = getProtocolFactory();
-
-    final TProcessor p = new Hbase.Processor<>(handler);
-    ImplType implType = ImplType.getServerImpl(conf);
-    TProcessor processor = p;
-
-    // Construct correct TransportFactory
-    TTransportFactory transportFactory;
-    if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
-      if (qop != null) {
-        throw new RuntimeException("Thrift server authentication"
-          + " doesn't work with framed transport yet");
-      }
-      transportFactory = new TFramedTransport.Factory(
-          conf.getInt(MAX_FRAME_SIZE_CONF_KEY, 2)  * 1024 * 1024);
-      LOG.debug("Using framed transport");
-    } else if (qop == null) {
-      transportFactory = new TTransportFactory();
-    } else {
-      // Extract the name from the principal
-      String thriftKerberosPrincipal = conf.get(THRIFT_KERBEROS_PRINCIPAL_KEY);
-      if (thriftKerberosPrincipal == null) {
-        throw new IllegalArgumentException(THRIFT_KERBEROS_PRINCIPAL_KEY + " cannot be null");
-      }
-      String name = SecurityUtil.getUserFromPrincipal(thriftKerberosPrincipal);
-      Map<String, String> saslProperties = SaslUtil.initSaslProperties(qop.name());
-      TSaslServerTransport.Factory saslFactory = new TSaslServerTransport.Factory();
-      saslFactory.addServerDefinition("GSSAPI", name, host, saslProperties,
-        new SaslGssCallbackHandler() {
-          @Override
-          public void handle(Callback[] callbacks)
-              throws UnsupportedCallbackException {
-            AuthorizeCallback ac = null;
-            for (Callback callback : callbacks) {
-              if (callback instanceof AuthorizeCallback) {
-                ac = (AuthorizeCallback) callback;
-              } else {
-                throw new UnsupportedCallbackException(callback,
-                    "Unrecognized SASL GSSAPI Callback");
-              }
-            }
-            if (ac != null) {
-              String authid = ac.getAuthenticationID();
-              String authzid = ac.getAuthorizationID();
-              if (!authid.equals(authzid)) {
-                ac.setAuthorized(false);
-              } else {
-                ac.setAuthorized(true);
-                String userName = SecurityUtil.getUserFromPrincipal(authzid);
-                LOG.info("Effective user: {}", userName);
-                ac.setAuthorizedID(userName);
-              }
-            }
-          }
-        });
-      transportFactory = saslFactory;
-
-      // Create a processor wrapper, to get the caller
-      processor = (inProt, outProt) -> {
-        TSaslServerTransport saslServerTransport =
-          (TSaslServerTransport)inProt.getTransport();
-        SaslServer saslServer = saslServerTransport.getSaslServer();
-        String principal = saslServer.getAuthorizationID();
-        hbaseHandler.setEffectiveUser(principal);
-        return p.process(inProt, outProt);
-      };
-    }
-
-    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
-      LOG.error("Server types {} don't support IP address binding at the moment. See " +
-          "https://issues.apache.org/jira/browse/HBASE-2155 for details.",
-          Joiner.on(", ").join(ImplType.serversThatCannotSpecifyBindIP()));
-      throw new RuntimeException("-" + BIND_CONF_KEY + " not supported with " + implType);
-    }
-
-    // Thrift's implementation uses '0' as a placeholder for 'use the default.'
-    int backlog = conf.getInt(BACKLOG_CONF_KEY, 0);
-
-    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
-        implType == ImplType.THREADED_SELECTOR) {
-      InetAddress listenAddress = getBindAddress(conf);
-      TNonblockingServerTransport serverTransport = new TNonblockingServerSocket(
-          new InetSocketAddress(listenAddress, listenPort));
-
-      if (implType == ImplType.NONBLOCKING) {
-        TNonblockingServer.Args serverArgs =
-            new TNonblockingServer.Args(serverTransport);
-        serverArgs.processor(processor)
-                  .transportFactory(transportFactory)
-                  .protocolFactory(protocolFactory);
-        tserver = new TNonblockingServer(serverArgs);
-      } else if (implType == ImplType.HS_HA) {
-        THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
-        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
-        ExecutorService executorService = createExecutor(
-            callQueue, serverArgs.getMaxWorkerThreads(), serverArgs.getMaxWorkerThreads());
-        serverArgs.executorService(executorService).processor(processor)
-                .transportFactory(transportFactory).protocolFactory(protocolFactory);
-        tserver = new THsHaServer(serverArgs);
-      } else { // THREADED_SELECTOR
-        TThreadedSelectorServer.Args serverArgs =
-            new HThreadedSelectorServerArgs(serverTransport, conf);
-        CallQueue callQueue = new CallQueue(new LinkedBlockingQueue<>(), metrics);
-        ExecutorService executorService = createExecutor(
-            callQueue, serverArgs.getWorkerThreads(), serverArgs.getWorkerThreads());
-        serverArgs.executorService(executorService).processor(processor)
-                .transportFactory(transportFactory).protocolFactory(protocolFactory);
-        tserver = new TThreadedSelectorServer(serverArgs);
-      }
-      LOG.info("starting HBase {} server on {}", implType.simpleClassName(),
-          Integer.toString(listenPort));
-    } else if (implType == ImplType.THREAD_POOL) {
-      // Thread pool server. Get the IP address to bind to.
-      InetAddress listenAddress = getBindAddress(conf);
-      int readTimeout = conf.getInt(THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY,
-          THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT);
-      TServerTransport serverTransport = new TServerSocket(
-          new TServerSocket.ServerSocketTransportArgs().
-              bindAddr(new InetSocketAddress(listenAddress, listenPort)).backlog(backlog).
-              clientTimeout(readTimeout));
-
-      TBoundedThreadPoolServer.Args serverArgs =
-          new TBoundedThreadPoolServer.Args(serverTransport, conf);
-      serverArgs.processor(processor).transportFactory(transportFactory)
-              .protocolFactory(protocolFactory);
-      LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
-          + listenAddress + ":" + Integer.toString(listenPort)
-          + " with readTimeout " + readTimeout + "ms; " + serverArgs);
-      this.tserver = new TBoundedThreadPoolServer(serverArgs, metrics);
-    } else {
-      throw new AssertionError("Unsupported Thrift server implementation: " +
-          implType.simpleClassName());
-    }
-
-    // A sanity check that we instantiated the right type of server.
-    if (tserver.getClass() != implType.serverClass) {
-      throw new AssertionError("Expected to create Thrift server class " +
-          implType.serverClass.getName() + " but got " +
-          tserver.getClass().getName());
-    }
-
-
-
-    registerFilters(conf);
-  }
-
-  private TProtocolFactory getProtocolFactory() {
-    TProtocolFactory protocolFactory;
-
-    if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
-      LOG.debug("Using compact protocol");
-      protocolFactory = new TCompactProtocol.Factory();
-    } else {
-      LOG.debug("Using binary protocol");
-      protocolFactory = new TBinaryProtocol.Factory();
-    }
-
-    return protocolFactory;
-  }
-
-  ExecutorService createExecutor(BlockingQueue<Runnable> callQueue,
-                                 int minWorkers, int maxWorkers) {
-    ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
-    tfb.setDaemon(true);
-    tfb.setNameFormat("thrift-worker-%d");
-    ThreadPoolExecutor threadPool = new THBaseThreadPoolExecutor(minWorkers, maxWorkers,
-            Long.MAX_VALUE, TimeUnit.SECONDS, callQueue, tfb.build(), metrics);
-    threadPool.allowCoreThreadTimeOut(true);
-    return threadPool;
-  }
-
-  private InetAddress getBindAddress(Configuration conf)
-      throws UnknownHostException {
-    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
-    return InetAddress.getByName(bindAddressStr);
-  }
-
-  protected static class ResultScannerWrapper {
-
-    private final ResultScanner scanner;
-    private final boolean sortColumns;
-    public ResultScannerWrapper(ResultScanner resultScanner,
-                                boolean sortResultColumns) {
-      scanner = resultScanner;
-      sortColumns = sortResultColumns;
-    }
-
-    public ResultScanner getScanner() {
-      return scanner;
-    }
-
-    public boolean isColumnSorted() {
-      return sortColumns;
-    }
-  }
-
-  /**
-   * The HBaseHandler is a glue object that connects Thrift RPC calls to the
-   * HBase client API primarily defined in the Admin and Table objects.
-   */
-  public static class HBaseHandler implements Hbase.Iface {
-    protected Configuration conf;
-    protected static final Logger LOG = LoggerFactory.getLogger(HBaseHandler.class);
-
-    // nextScannerId and scannerMap are used to manage scanner state
-    protected int nextScannerId = 0;
-    protected HashMap<Integer, ResultScannerWrapper> scannerMap;
-    private ThriftMetrics metrics = null;
-
-    private final ConnectionCache connectionCache;
-    IncrementCoalescer coalescer;
-
-    static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
-    static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
-
-    /**
-     * Returns a list of all the column families for a given Table.
-     *
-     * @param table
-     * @throws IOException
-     */
-    byte[][] getAllColumns(Table table) throws IOException {
-      HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
-      byte[][] columns = new byte[cds.length][];
-      for (int i = 0; i < cds.length; i++) {
-        columns[i] = Bytes.add(cds[i].getName(),
-            KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
-      }
-      return columns;
-    }
-
-    /**
-     * Creates and returns a Table instance from a given table name.
-     *
-     * @param tableName
-     *          name of table
-     * @return Table object
-     * @throws IOException if getting the table fails
-     */
-    public Table getTable(final byte[] tableName) throws IOException {
-      String table = Bytes.toString(tableName);
-      return connectionCache.getTable(table);
-    }
-
-    public Table getTable(final ByteBuffer tableName) throws IOException {
-      return getTable(getBytes(tableName));
-    }
-
-    /**
-     * Assigns a unique ID to the scanner and adds the mapping to an internal
-     * hash-map.
-     *
-     * @param scanner the {@link ResultScanner} to add
-     * @return integer scanner id
-     */
-    protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
-      int id = nextScannerId++;
-      ResultScannerWrapper resultScannerWrapper = new ResultScannerWrapper(scanner, sortColumns);
-      scannerMap.put(id, resultScannerWrapper);
-      return id;
-    }
-
-    /**
-     * Returns the scanner associated with the specified ID.
-     *
-     * @param id the ID of the scanner to get
-     * @return a Scanner, or null if ID was invalid.
-     */
-    protected synchronized ResultScannerWrapper getScanner(int id) {
-      return scannerMap.get(id);
-    }
-
-    /**
-     * Removes the scanner associated with the specified ID from the internal
-     * id-&gt;scanner hash-map.
-     *
-     * @param id the ID of the scanner to remove
-     * @return a Scanner, or null if ID was invalid.
-     */
-    protected synchronized ResultScannerWrapper removeScanner(int id) {
-      return scannerMap.remove(id);
-    }
-
-    protected HBaseHandler(final Configuration c,
-        final UserProvider userProvider) throws IOException {
-      this.conf = c;
-      scannerMap = new HashMap<>();
-      this.coalescer = new IncrementCoalescer(this);
-
-      int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
-      int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
-      connectionCache = new ConnectionCache(
-        conf, userProvider, cleanInterval, maxIdleTime);
-    }
-
-    /**
-     * Obtain HBaseAdmin. Creates the instance if it is not already created.
-     */
-    private Admin getAdmin() throws IOException {
-      return connectionCache.getAdmin();
-    }
-
-    void setEffectiveUser(String effectiveUser) {
-      connectionCache.setEffectiveUser(effectiveUser);
-    }
-
-    @Override
-    public void enableTable(ByteBuffer tableName) throws IOError {
-      try{
-        getAdmin().enableTable(getTableName(tableName));
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public void disableTable(ByteBuffer tableName) throws IOError{
-      try{
-        getAdmin().disableTable(getTableName(tableName));
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
-      try {
-        return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
-    // table and region.
-    @Override
-    public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
-      try {
-        try {
-          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
-        } catch (IllegalArgumentException e) {
-          // Invalid region, try table
-          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
-    // to table and region.
-    @Override
-    public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
-      try {
-        try {
-          getAdmin().compactRegion(getBytes(tableNameOrRegionName));
-        } catch (IllegalArgumentException e) {
-          // Invalid region, try table
-          getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public List<ByteBuffer> getTableNames() throws IOError {
-      try {
-        TableName[] tableNames = this.getAdmin().listTableNames();
-        ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
-        for (TableName tableName : tableNames) {
-          list.add(ByteBuffer.wrap(tableName.getName()));
-        }
-        return list;
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    /**
-     * @return the list of regions in the given table, or an empty list if the table does not exist
-     */
-    @Override
-    public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
-      try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
-        List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
-        List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
-        for (HRegionLocation regionLocation : regionLocations) {
-          RegionInfo info = regionLocation.getRegionInfo();
-          ServerName serverName = regionLocation.getServerName();
-          TRegionInfo region = new TRegionInfo();
-          region.serverName = ByteBuffer.wrap(
-              Bytes.toBytes(serverName.getHostname()));
-          region.port = serverName.getPort();
-          region.startKey = ByteBuffer.wrap(info.getStartKey());
-          region.endKey = ByteBuffer.wrap(info.getEndKey());
-          region.id = info.getRegionId();
-          region.name = ByteBuffer.wrap(info.getRegionName());
-          region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
-          results.add(region);
-        }
-        return results;
-      } catch (TableNotFoundException e) {
-        // Return empty list for non-existing table
-        return Collections.emptyList();
-      } catch (IOException e){
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public List<TCell> get(
-        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError {
-      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-      if (famAndQf.length == 1) {
-        return get(tableName, row, famAndQf[0], null, attributes);
-      }
-      if (famAndQf.length == 2) {
-        return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
-      }
-      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
-    }
-
-    /**
-     * Note: this internal interface is slightly different from public APIs in regard to handling
-     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
-     * we respect qual == null as a request for the entire column family. The caller (
-     * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
-     * column is parse like normal.
-     */
-    protected List<TCell> get(ByteBuffer tableName,
-                              ByteBuffer row,
-                              byte[] family,
-                              byte[] qualifier,
-                              Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Get get = new Get(getBytes(row));
-        addAttributes(get, attributes);
-        if (qualifier == null) {
-          get.addFamily(family);
-        } else {
-          get.addColumn(family, qualifier);
-        }
-        Result result = table.get(get);
-        return ThriftUtilities.cellFromHBase(result.rawCells());
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally {
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-      if(famAndQf.length == 1) {
-        return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
-      }
-      if (famAndQf.length == 2) {
-        return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
-      }
-      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
-
-    }
-
-    /**
-     * Note: this public interface is slightly different from public Java APIs in regard to
-     * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
-     * Rather, we respect qual == null as a request for the entire column family. If you want to
-     * access the entire column family, use
-     * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
-     * that lacks a {@code ':'}.
-     */
-    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
-        byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Get get = new Get(getBytes(row));
-        addAttributes(get, attributes);
-        if (null == qualifier) {
-          get.addFamily(family);
-        } else {
-          get.addColumn(family, qualifier);
-        }
-        get.setMaxVersions(numVersions);
-        Result result = table.get(get);
-        return ThriftUtilities.cellFromHBase(result.rawCells());
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-      if (famAndQf.length == 1) {
-        return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
-      }
-      if (famAndQf.length == 2) {
-        return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
-          attributes);
-      }
-      throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
-    }
-
-    /**
-     * Note: this internal interface is slightly different from public APIs in regard to handling
-     * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
-     * we respect qual == null as a request for the entire column family. The caller (
-     * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
-     * consistent in that the column is parse like normal.
-     */
-    protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
-        byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Get get = new Get(getBytes(row));
-        addAttributes(get, attributes);
-        if (null == qualifier) {
-          get.addFamily(family);
-        } else {
-          get.addColumn(family, qualifier);
-        }
-        get.setTimeRange(0, timestamp);
-        get.setMaxVersions(numVersions);
-        Result result = table.get(get);
-        return ThriftUtilities.cellFromHBase(result.rawCells());
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      return getRowWithColumnsTs(tableName, row, null,
-                                 HConstants.LATEST_TIMESTAMP,
-                                 attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
-                                              ByteBuffer row,
-        List<ByteBuffer> columns,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      return getRowWithColumnsTs(tableName, row, columns,
-                                 HConstants.LATEST_TIMESTAMP,
-                                 attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
-        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      return getRowWithColumnsTs(tableName, row, null,
-                                 timestamp, attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowWithColumnsTs(
-        ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
-        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        if (columns == null) {
-          Get get = new Get(getBytes(row));
-          addAttributes(get, attributes);
-          get.setTimeRange(0, timestamp);
-          Result result = table.get(get);
-          return ThriftUtilities.rowResultFromHBase(result);
-        }
-        Get get = new Get(getBytes(row));
-        addAttributes(get, attributes);
-        for(ByteBuffer column : columns) {
-          byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-          if (famAndQf.length == 1) {
-            get.addFamily(famAndQf[0]);
-          } else {
-            get.addColumn(famAndQf[0], famAndQf[1]);
-          }
-        }
-        get.setTimeRange(0, timestamp);
-        Result result = table.get(get);
-        return ThriftUtilities.rowResultFromHBase(result);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public List<TRowResult> getRows(ByteBuffer tableName,
-                                    List<ByteBuffer> rows,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError {
-      return getRowsWithColumnsTs(tableName, rows, null,
-                                  HConstants.LATEST_TIMESTAMP,
-                                  attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
-                                               List<ByteBuffer> rows,
-        List<ByteBuffer> columns,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      return getRowsWithColumnsTs(tableName, rows, columns,
-                                  HConstants.LATEST_TIMESTAMP,
-                                  attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowsTs(ByteBuffer tableName,
-                                      List<ByteBuffer> rows,
-        long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      return getRowsWithColumnsTs(tableName, rows, null,
-                                  timestamp, attributes);
-    }
-
-    @Override
-    public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
-                                                 List<ByteBuffer> rows,
-        List<ByteBuffer> columns, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
-      Table table= null;
-      try {
-        List<Get> gets = new ArrayList<>(rows.size());
-        table = getTable(tableName);
-        if (metrics != null) {
-          metrics.incNumRowKeysInBatchGet(rows.size());
-        }
-        for (ByteBuffer row : rows) {
-          Get get = new Get(getBytes(row));
-          addAttributes(get, attributes);
-          if (columns != null) {
-
-            for(ByteBuffer column : columns) {
-              byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-              if (famAndQf.length == 1) {
-                get.addFamily(famAndQf[0]);
-              } else {
-                get.addColumn(famAndQf[0], famAndQf[1]);
-              }
-            }
-          }
-          get.setTimeRange(0, timestamp);
-          gets.add(get);
-        }
-        Result[] result = table.get(gets);
-        return ThriftUtilities.rowResultFromHBase(result);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void deleteAll(
-        ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError {
-      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
-                  attributes);
-    }
-
-    @Override
-    public void deleteAllTs(ByteBuffer tableName,
-                            ByteBuffer row,
-                            ByteBuffer column,
-        long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Delete delete  = new Delete(getBytes(row));
-        addAttributes(delete, attributes);
-        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-        if (famAndQf.length == 1) {
-          delete.addFamily(famAndQf[0], timestamp);
-        } else {
-          delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
-        }
-        table.delete(delete);
-
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally {
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void deleteAllRow(
-        ByteBuffer tableName, ByteBuffer row,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
-    }
-
-    @Override
-    public void deleteAllRowTs(
-        ByteBuffer tableName, ByteBuffer row, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Delete delete  = new Delete(getBytes(row), timestamp);
-        addAttributes(delete, attributes);
-        table.delete(delete);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally {
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void createTable(ByteBuffer in_tableName,
-        List<ColumnDescriptor> columnFamilies) throws IOError,
-        IllegalArgument, AlreadyExists {
-      TableName tableName = getTableName(in_tableName);
-      try {
-        if (getAdmin().tableExists(tableName)) {
-          throw new AlreadyExists("table name already in use");
-        }
-        HTableDescriptor desc = new HTableDescriptor(tableName);
-        for (ColumnDescriptor col : columnFamilies) {
-          HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
-          desc.addFamily(colDesc);
-        }
-        getAdmin().createTable(desc);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } catch (IllegalArgumentException e) {
-        LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
-      }
-    }
-
-    private static TableName getTableName(ByteBuffer buffer) {
-      return TableName.valueOf(getBytes(buffer));
-    }
-
-    @Override
-    public void deleteTable(ByteBuffer in_tableName) throws IOError {
-      TableName tableName = getTableName(in_tableName);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("deleteTable: table={}", tableName);
-      }
-      try {
-        if (!getAdmin().tableExists(tableName)) {
-          throw new IOException("table does not exist");
-        }
-        getAdmin().deleteTable(tableName);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public void mutateRow(ByteBuffer tableName, ByteBuffer row,
-        List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, IllegalArgument {
-      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
-    }
-
-    @Override
-    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
-        List<Mutation> mutations, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, IllegalArgument {
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Put put = new Put(getBytes(row), timestamp);
-        addAttributes(put, attributes);
-
-        Delete delete = new Delete(getBytes(row));
-        addAttributes(delete, attributes);
-        if (metrics != null) {
-          metrics.incNumRowKeysInBatchMutate(mutations.size());
-        }
-
-        // I apologize for all this mess :)
-        CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-        for (Mutation m : mutations) {
-          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
-          if (m.isDelete) {
-            if (famAndQf.length == 1) {
-              delete.addFamily(famAndQf[0], timestamp);
-            } else {
-              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
-            }
-            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
-          } else {
-            if(famAndQf.length == 1) {
-              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
-                  + "over the whole column family.");
-            } else {
-              put.add(builder.clear()
-                  .setRow(put.getRow())
-                  .setFamily(famAndQf[0])
-                  .setQualifier(famAndQf[1])
-                  .setTimestamp(put.getTimestamp())
-                  .setType(Type.Put)
-                  .setValue(m.value != null ? getBytes(m.value)
-                      : HConstants.EMPTY_BYTE_ARRAY)
-                  .build());
-            }
-            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
-          }
-        }
-        if (!delete.isEmpty()) {
-          table.delete(delete);
-        }
-        if (!put.isEmpty()) {
-          table.put(put);
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } catch (IllegalArgumentException e) {
-        LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, IllegalArgument, TException {
-      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
-    }
-
-    @Override
-    public void mutateRowsTs(
-        ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, IllegalArgument, TException {
-      List<Put> puts = new ArrayList<>();
-      List<Delete> deletes = new ArrayList<>();
-      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
-      for (BatchMutation batch : rowBatches) {
-        byte[] row = getBytes(batch.row);
-        List<Mutation> mutations = batch.mutations;
-        Delete delete = new Delete(row);
-        addAttributes(delete, attributes);
-        Put put = new Put(row, timestamp);
-        addAttributes(put, attributes);
-        for (Mutation m : mutations) {
-          byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
-          if (m.isDelete) {
-            // no qualifier, family only.
-            if (famAndQf.length == 1) {
-              delete.addFamily(famAndQf[0], timestamp);
-            } else {
-              delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
-            }
-            delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
-                : Durability.SKIP_WAL);
-          } else {
-            if (famAndQf.length == 1) {
-              LOG.warn("No column qualifier specified. Delete is the only mutation supported "
-                  + "over the whole column family.");
-            }
-            if (famAndQf.length == 2) {
-              try {
-                put.add(builder.clear()
-                    .setRow(put.getRow())
-                    .setFamily(famAndQf[0])
-                    .setQualifier(famAndQf[1])
-                    .setTimestamp(put.getTimestamp())
-                    .setType(Type.Put)
-                    .setValue(m.value != null ? getBytes(m.value)
-                        : HConstants.EMPTY_BYTE_ARRAY)
-                    .build());
-              } catch (IOException e) {
-                throw new IllegalArgumentException(e);
-              }
-            } else {
-              throw new IllegalArgumentException("Invalid famAndQf provided.");
-            }
-            put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
-          }
-        }
-        if (!delete.isEmpty()) {
-          deletes.add(delete);
-        }
-        if (!put.isEmpty()) {
-          puts.add(put);
-        }
-      }
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        if (!puts.isEmpty()) {
-          table.put(puts);
-        }
-        if (!deletes.isEmpty()) {
-          table.delete(deletes);
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } catch (IllegalArgumentException e) {
-        LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public long atomicIncrement(
-        ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
-            throws IOError, IllegalArgument, TException {
-      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
-      if(famAndQf.length == 1) {
-        return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
-      }
-      return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
-    }
-
-    protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
-        byte [] family, byte [] qualifier, long amount)
-        throws IOError, IllegalArgument, TException {
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        return table.incrementColumnValue(
-            getBytes(row), family, qualifier, amount);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally {
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void scannerClose(int id) throws IOError, IllegalArgument {
-      LOG.debug("scannerClose: id={}", id);
-      ResultScannerWrapper resultScannerWrapper = getScanner(id);
-      if (resultScannerWrapper == null) {
-        LOG.warn("scanner ID is invalid");
-        throw new IllegalArgument("scanner ID is invalid");
-      }
-      resultScannerWrapper.getScanner().close();
-      removeScanner(id);
-    }
-
-    @Override
-    public List<TRowResult> scannerGetList(int id,int nbRows)
-        throws IllegalArgument, IOError {
-      LOG.debug("scannerGetList: id={}", id);
-      ResultScannerWrapper resultScannerWrapper = getScanner(id);
-      if (null == resultScannerWrapper) {
-        String message = "scanner ID is invalid";
-        LOG.warn(message);
-        throw new IllegalArgument("scanner ID is invalid");
-      }
-
-      Result [] results;
-      try {
-        results = resultScannerWrapper.getScanner().next(nbRows);
-        if (null == results) {
-          return new ArrayList<>();
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-      return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
-    }
-
-    @Override
-    public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
-      return scannerGetList(id,1);
-    }
-
-    @Override
-    public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan();
-        addAttributes(scan, attributes);
-        if (tScan.isSetStartRow()) {
-          scan.setStartRow(tScan.getStartRow());
-        }
-        if (tScan.isSetStopRow()) {
-          scan.setStopRow(tScan.getStopRow());
-        }
-        if (tScan.isSetTimestamp()) {
-          scan.setTimeRange(0, tScan.getTimestamp());
-        }
-        if (tScan.isSetCaching()) {
-          scan.setCaching(tScan.getCaching());
-        }
-        if (tScan.isSetBatchSize()) {
-          scan.setBatch(tScan.getBatchSize());
-        }
-        if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
-          for(ByteBuffer column : tScan.getColumns()) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        if (tScan.isSetFilterString()) {
-          ParseFilter parseFilter = new ParseFilter();
-          scan.setFilter(
-              parseFilter.parseFilterString(tScan.getFilterString()));
-        }
-        if (tScan.isSetReversed()) {
-          scan.setReversed(tScan.isReversed());
-        }
-        if (tScan.isSetCacheBlocks()) {
-          scan.setCacheBlocks(tScan.isCacheBlocks());
-        }
-        return addScanner(table.getScanner(scan), tScan.sortColumns);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
-        List<ByteBuffer> columns,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan(getBytes(startRow));
-        addAttributes(scan, attributes);
-        if(columns != null && !columns.isEmpty()) {
-          for(ByteBuffer column : columns) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        return addScanner(table.getScanner(scan), false);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
-        ByteBuffer stopRow, List<ByteBuffer> columns,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, TException {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
-        addAttributes(scan, attributes);
-        if(columns != null && !columns.isEmpty()) {
-          for(ByteBuffer column : columns) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        return addScanner(table.getScanner(scan), false);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public int scannerOpenWithPrefix(ByteBuffer tableName,
-                                     ByteBuffer startAndPrefix,
-                                     List<ByteBuffer> columns,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, TException {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan(getBytes(startAndPrefix));
-        addAttributes(scan, attributes);
-        Filter f = new WhileMatchFilter(
-            new PrefixFilter(getBytes(startAndPrefix)));
-        scan.setFilter(f);
-        if (columns != null && !columns.isEmpty()) {
-          for(ByteBuffer column : columns) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        return addScanner(table.getScanner(scan), false);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
-        List<ByteBuffer> columns, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan(getBytes(startRow));
-        addAttributes(scan, attributes);
-        scan.setTimeRange(0, timestamp);
-        if (columns != null && !columns.isEmpty()) {
-          for (ByteBuffer column : columns) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        return addScanner(table.getScanner(scan), false);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
-        ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
-        Map<ByteBuffer, ByteBuffer> attributes)
-        throws IOError, TException {
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
-        addAttributes(scan, attributes);
-        scan.setTimeRange(0, timestamp);
-        if (columns != null && !columns.isEmpty()) {
-          for (ByteBuffer column : columns) {
-            byte [][] famQf = CellUtil.parseColumn(getBytes(column));
-            if(famQf.length == 1) {
-              scan.addFamily(famQf[0]);
-            } else {
-              scan.addColumn(famQf[0], famQf[1]);
-            }
-          }
-        }
-        scan.setTimeRange(0, timestamp);
-        return addScanner(table.getScanner(scan), false);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
-        ByteBuffer tableName) throws IOError, TException {
-
-      Table table = null;
-      try {
-        TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
-
-        table = getTable(tableName);
-        HTableDescriptor desc = table.getTableDescriptor();
-
-        for (HColumnDescriptor e : desc.getFamilies()) {
-          ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
-          columns.put(col.name, col);
-        }
-        return columns;
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally {
-        closeTable(table);
-      }
-    }
-
-    private void closeTable(Table table) throws IOError {
-      try{
-        if(table != null){
-          table.close();
-        }
-      } catch (IOException e){
-        LOG.error(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    @Override
-    public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
-      try {
-        byte[] row = getBytes(searchRow);
-        Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
-          HConstants.CATALOG_FAMILY);
-
-        if (startRowResult == null) {
-          throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
-                                + Bytes.toStringBinary(row));
-        }
-
-        // find region start and end keys
-        RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
-        if (regionInfo == null) {
-          throw new IOException("RegionInfo REGIONINFO was null or " +
-                                " empty in Meta for row="
-                                + Bytes.toStringBinary(row));
-        }
-        TRegionInfo region = new TRegionInfo();
-        region.setStartKey(regionInfo.getStartKey());
-        region.setEndKey(regionInfo.getEndKey());
-        region.id = regionInfo.getRegionId();
-        region.setName(regionInfo.getRegionName());
-        region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
-
-        // find region assignment to server
-        ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
-        if (serverName != null) {
-          region.setServerName(Bytes.toBytes(serverName.getHostname()));
-          region.port = serverName.getPort();
-        }
-        return region;
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      }
-    }
-
-    private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
-        throws IOException {
-      Scan scan = new Scan(row);
-      scan.setReversed(true);
-      scan.addFamily(family);
-      scan.setStartRow(row);
-      try (Table table = getTable(tableName);
-           ResultScanner scanner = table.getScanner(scan)) {
-        return scanner.next();
-      }
-    }
-
-    private void initMetrics(ThriftMetrics metrics) {
-      this.metrics = metrics;
-    }
-
-    @Override
-    public void increment(TIncrement tincrement) throws IOError, TException {
-
-      if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
-        throw new TException("Must supply a table and a row key; can't increment");
-      }
-
-      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
-        this.coalescer.queueIncrement(tincrement);
-        return;
-      }
-
-      Table table = null;
-      try {
-        table = getTable(tincrement.getTable());
-        Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
-        table.increment(inc);
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
-      if (conf.getBoolean(COALESCE_INC_KEY, false)) {
-        this.coalescer.queueIncrements(tincrements);
-        return;
-      }
-      for (TIncrement tinc : tincrements) {
-        increment(tinc);
-      }
-    }
-
-    @Override
-    public List<TCell> append(TAppend tappend) throws IOError, TException {
-      if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
-        throw new TException("Must supply a table and a row key; can't append");
-      }
-
-      Table table = null;
-      try {
-        table = getTable(tappend.getTable());
-        Append append = ThriftUtilities.appendFromThrift(tappend);
-        Result result = table.append(append);
-        return ThriftUtilities.cellFromHBase(result.rawCells());
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } finally{
-        closeTable(table);
-      }
-    }
-
-    @Override
-    public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
-        ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
-        IllegalArgument, TException {
-      Put put;
-      try {
-        put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
-        addAttributes(put, attributes);
-
-        byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
-        put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
-            .setRow(put.getRow())
-            .setFamily(famAndQf[0])
-            .setQualifier(famAndQf[1])
-            .setTimestamp(put.getTimestamp())
-            .setType(Type.Put)
-            .setValue(mput.value != null ? getBytes(mput.value)
-                : HConstants.EMPTY_BYTE_ARRAY)
-            .build());
-        put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
-      } catch (IOException | IllegalArgumentException e) {
-        LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
-      }
-
-      Table table = null;
-      try {
-        table = getTable(tableName);
-        byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
-        Table.CheckAndMutateBuilder mutateBuilder =
-            table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
-        if (value != null) {
-          return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
-        } else {
-          return mutateBuilder.ifNotExists().thenPut(put);
-        }
-      } catch (IOException e) {
-        LOG.warn(e.getMessage(), e);
-        throw getIOError(e);
-      } catch (IllegalArgumentException e) {
-        LOG.warn(e.getMessage(), e);
-        throw new IllegalArgument(Throwables.getStackTraceAsString(e));
-      } finally {
-        closeTable(table);
-      }
-    }
-  }
-
-  private static IOError getIOError(Throwable throwable) {
-    IOError error = new IOErrorWithCause(throwable);
-    error.setMessage(Throwables.getStackTraceAsString(throwable));
-    return error;
-  }
-
-  /**
-   * Adds all the attributes into the Operation object
-   */
-  private static void addAttributes(OperationWithAttributes op,
-    Map<ByteBuffer, ByteBuffer> attributes) {
-    if (attributes == null || attributes.isEmpty()) {
-      return;
-    }
-    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
-      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
-      byte[] value =  getBytes(entry.getValue());
-      op.setAttribute(name, value);
-    }
-  }
-
-  public static void registerFilters(Configuration conf) {
-    String[] filters = conf.getStrings("hbase.thrift.filters");
-    Splitter splitter = Splitter.on(':');
-    if(filters != null) {
-      for(String filterClass: filters) {
-        List<String> filterPart = splitter.splitToList(filterClass);
-        if(filterPart.size() != 2) {
-          LOG.warn("Invalid filter specification " + filterClass + " - skipping");
-        } else {
-          ParseFilter.registerFilter(filterPart.get(0), filterPart.get(1));
-        }
-      }
-    }
-  }
-
-  public static class IOErrorWithCause extends IOError {
-    private final Throwable cause;
-    public IOErrorWithCause(Throwable cause) {
-      this.cause = cause;
-    }
-
-    @Override
-    public synchronized Throwable getCause() {
-      return cause;
-    }
-
-    @Override
-    public boolean equals(Object other) {
-      if (super.equals(other) &&
-          other instanceof IOErrorWithCause) {
-        Throwable otherCause = ((IOErrorWithCause) other).getCause();
-        if (this.getCause() != null) {
-          return otherCause != null && this.getCause().equals(otherCause);
-        } else {
-          return otherCause == null;
-        }
-      }
-      return false;
-    }
-
-    @Override
-    public int hashCode() {
-      int result = super.hashCode();
-      result = 31 * result + (cause != null ? cause.hashCode() : 0);
-      return result;
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
index 2bfeefe..a9ec646 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/ThriftHBaseServiceHandler.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.thrift2;
 
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_READONLY_ENABLED_DEFAULT;
 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.appendFromThrift;
 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.columnFamilyDescriptorFromThrift;
 import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.compareOpFromThrift;
@@ -44,10 +46,6 @@ import static org.apache.hadoop.hbase.thrift2.ThriftUtilities.tableNamesFromHBas
 import static org.apache.thrift.TBaseHelper.byteBufferToByteArray;
 
 import java.io.IOException;
-import java.lang.reflect.InvocationHandler;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.lang.reflect.Proxy;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -68,7 +66,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.security.UserProvider;
-import org.apache.hadoop.hbase.thrift.ThriftMetrics;
+import org.apache.hadoop.hbase.thrift.HBaseServiceHandler;
 import org.apache.hadoop.hbase.thrift2.generated.TAppend;
 import org.apache.hadoop.hbase.thrift2.generated.TColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.thrift2.generated.TCompareOp;
@@ -87,7 +85,6 @@ import org.apache.hadoop.hbase.thrift2.generated.TScan;
 import org.apache.hadoop.hbase.thrift2.generated.TTableDescriptor;
 import org.apache.hadoop.hbase.thrift2.generated.TTableName;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ConnectionCache;
 import org.apache.thrift.TException;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -99,7 +96,7 @@ import org.slf4j.LoggerFactory;
  */
 @InterfaceAudience.Private
 @SuppressWarnings("deprecation")
-public class ThriftHBaseServiceHandler implements THBaseService.Iface {
+public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements THBaseService.Iface {
 
   // TODO: Size of pool configuraple
   private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
@@ -109,50 +106,10 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
   private final AtomicInteger nextScannerId = new AtomicInteger(0);
   private final Map<Integer, ResultScanner> scannerMap = new ConcurrentHashMap<>();
 
-  private final ConnectionCache connectionCache;
-
-  static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
-  static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
-
   private static final IOException ioe
       = new DoNotRetryIOException("Thrift Server is in Read-only mode.");
   private boolean isReadOnly;
 
-  public static THBaseService.Iface newInstance(
-      THBaseService.Iface handler, ThriftMetrics metrics) {
-    return (THBaseService.Iface) Proxy.newProxyInstance(handler.getClass().getClassLoader(),
-      new Class[] { THBaseService.Iface.class }, new THBaseServiceMetricsProxy(handler, metrics));
-  }
-
-  private static final class THBaseServiceMetricsProxy implements InvocationHandler {
-    private final THBaseService.Iface handler;
-    private final ThriftMetrics metrics;
-
-    private THBaseServiceMetricsProxy(THBaseService.Iface handler, ThriftMetrics metrics) {
-      this.handler = handler;
-      this.metrics = metrics;
-    }
-
-    @Override
-    public Object invoke(Object proxy, Method m, Object[] args) throws Throwable {
-      Object result;
-      long start = now();
-      try {
-        result = m.invoke(handler, args);
-      } catch (InvocationTargetException e) {
-        metrics.exception(e.getCause());
-        throw e.getTargetException();
-      } catch (Exception e) {
-        metrics.exception(e);
-        throw new RuntimeException("unexpected invocation exception: " + e.getMessage());
-      } finally {
-        long processTime = now() - start;
-        metrics.incMethodTime(m.getName(), processTime);
-      }
-      return result;
-    }
-  }
-
   private static class TIOErrorWithCause extends TIOError {
     private Throwable cause;
 
@@ -188,20 +145,14 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
     }
   }
 
-  private static long now() {
-    return System.nanoTime();
-  }
-
   ThriftHBaseServiceHandler(final Configuration conf,
       final UserProvider userProvider) throws IOException {
-    int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
-    int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
-    connectionCache = new ConnectionCache(
-      conf, userProvider, cleanInterval, maxIdleTime);
-    isReadOnly = conf.getBoolean("hbase.thrift.readonly", false);
+    super(conf, userProvider);
+    isReadOnly = conf.getBoolean(THRIFT_READONLY_ENABLED, THRIFT_READONLY_ENABLED_DEFAULT);
   }
 
-  private Table getTable(ByteBuffer tableName) {
+  @Override
+  protected Table getTable(ByteBuffer tableName) {
     try {
       return connectionCache.getTable(Bytes.toString(byteBufferToByteArray(tableName)));
     } catch (IOException ie) {
@@ -251,10 +202,6 @@ public class ThriftHBaseServiceHandler implements THBaseService.Iface {
     return scannerMap.get(id);
   }
 
-  void setEffectiveUser(String effectiveUser) {
-    connectionCache.setEffectiveUser(effectiveUser);
-  }
-
   /**
    * Removes the scanner associated with the specified ID from the internal HashMap.
    * @param id of the Scanner to remove