You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/04 06:40:31 UTC

[05/15] hbase git commit: HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server

HBASE-21652 Refactor ThriftServer making thrift2 server inherited from thrift1 server


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e4b6b4af
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e4b6b4af
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e4b6b4af

Branch: refs/heads/HBASE-21512
Commit: e4b6b4afb933a961f543537875f87a2dc62d3757
Parents: f0b50a8
Author: Allan Yang <al...@apache.org>
Authored: Wed Jan 2 16:13:17 2019 +0800
Committer: Allan Yang <al...@apache.org>
Committed: Wed Jan 2 16:13:57 2019 +0800

----------------------------------------------------------------------
 .../apache/hadoop/hbase/thrift/Constants.java   |  151 ++
 .../hbase/thrift/HBaseServiceHandler.java       |   90 +
 .../hbase/thrift/HbaseHandlerMetricsProxy.java  |   20 +-
 .../apache/hadoop/hbase/thrift/ImplType.java    |  143 ++
 .../hadoop/hbase/thrift/IncrementCoalescer.java |    6 +-
 .../hbase/thrift/ThriftHBaseServiceHandler.java | 1347 ++++++++++++
 .../hadoop/hbase/thrift/ThriftHttpServlet.java  |   12 +-
 .../hadoop/hbase/thrift/ThriftServer.java       |  698 +++++-
 .../hadoop/hbase/thrift/ThriftServerRunner.java | 2031 ------------------
 .../thrift2/ThriftHBaseServiceHandler.java      |   69 +-
 .../hadoop/hbase/thrift2/ThriftServer.java      |  594 +----
 .../resources/hbase-webapps/thrift/thrift.jsp   |    2 +-
 .../hbase/thrift/TestThriftHttpServer.java      |   28 +-
 .../hadoop/hbase/thrift/TestThriftServer.java   |   58 +-
 .../hbase/thrift/TestThriftServerCmdLine.java   |   48 +-
 .../thrift/TestThriftSpnegoHttpServer.java      |   21 +-
 .../hbase/thrift2/TestThrift2HttpServer.java    |   90 +
 .../hbase/thrift2/TestThrift2ServerCmdLine.java |   99 +
 .../thrift2/TestThriftHBaseServiceHandler.java  |   15 +-
 19 files changed, 2711 insertions(+), 2811 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java
new file mode 100644
index 0000000..8e3d004
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/Constants.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Thrift related constants
+ */
+@InterfaceAudience.Private
+public final class Constants {
+  private Constants(){}
+
+  public static final int DEFAULT_HTTP_MAX_HEADER_SIZE = 64 * 1024; // 64k
+
+  public static final String SERVER_TYPE_CONF_KEY =
+      "hbase.regionserver.thrift.server.type";
+
+  public static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
+  public static final boolean COMPACT_CONF_DEFAULT = false;
+
+  public static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
+  public static final boolean FRAMED_CONF_DEFAULT = false;
+
+  public static final String MAX_FRAME_SIZE_CONF_KEY =
+      "hbase.regionserver.thrift.framed.max_frame_size_in_mb";
+  public static final int MAX_FRAME_SIZE_CONF_DEFAULT = 2;
+
+  public static final String COALESCE_INC_KEY = "hbase.regionserver.thrift.coalesceIncrement";
+  public static final String USE_HTTP_CONF_KEY = "hbase.regionserver.thrift.http";
+
+  public static final String HTTP_MIN_THREADS_KEY = "hbase.thrift.http_threads.min";
+  public static final int HTTP_MIN_THREADS_KEY_DEFAULT = 2;
+
+  public static final String HTTP_MAX_THREADS_KEY = "hbase.thrift.http_threads.max";
+  public static final int HTTP_MAX_THREADS_KEY_DEFAULT = 100;
+
+  // ssl related configs
+  public static final String THRIFT_SSL_ENABLED_KEY = "hbase.thrift.ssl.enabled";
+  public static final String THRIFT_SSL_KEYSTORE_STORE_KEY = "hbase.thrift.ssl.keystore.store";
+  public static final String THRIFT_SSL_KEYSTORE_PASSWORD_KEY =
+      "hbase.thrift.ssl.keystore.password";
+  public static final String THRIFT_SSL_KEYSTORE_KEYPASSWORD_KEY
+      = "hbase.thrift.ssl.keystore.keypassword";
+  public static final String THRIFT_SSL_EXCLUDE_CIPHER_SUITES_KEY =
+      "hbase.thrift.ssl.exclude.cipher.suites";
+  public static final String THRIFT_SSL_INCLUDE_CIPHER_SUITES_KEY =
+      "hbase.thrift.ssl.include.cipher.suites";
+  public static final String THRIFT_SSL_EXCLUDE_PROTOCOLS_KEY =
+      "hbase.thrift.ssl.exclude.protocols";
+  public static final String THRIFT_SSL_INCLUDE_PROTOCOLS_KEY =
+      "hbase.thrift.ssl.include.protocols";
+
+
+  public static final String THRIFT_SUPPORT_PROXYUSER_KEY = "hbase.thrift.support.proxyuser";
+
+  //kerberos related configs
+  public static final String THRIFT_DNS_INTERFACE_KEY = "hbase.thrift.dns.interface";
+  public static final String THRIFT_DNS_NAMESERVER_KEY = "hbase.thrift.dns.nameserver";
+  public static final String THRIFT_KERBEROS_PRINCIPAL_KEY = "hbase.thrift.kerberos.principal";
+  public static final String THRIFT_KEYTAB_FILE_KEY = "hbase.thrift.keytab.file";
+  public static final String THRIFT_SPNEGO_PRINCIPAL_KEY = "hbase.thrift.spnego.principal";
+  public static final String THRIFT_SPNEGO_KEYTAB_FILE_KEY = "hbase.thrift.spnego.keytab.file";
+
+  /**
+   * Amount of time in milliseconds before a server thread will timeout
+   * waiting for client to send data on a connected socket. Currently,
+   * applies only to TBoundedThreadPoolServer
+   */
+  public static final String THRIFT_SERVER_SOCKET_READ_TIMEOUT_KEY =
+      "hbase.thrift.server.socket.read.timeout";
+  public static final int THRIFT_SERVER_SOCKET_READ_TIMEOUT_DEFAULT = 60000;
+
+
+  /**
+   * Thrift quality of protection configuration key. Valid values can be:
+   * auth-conf: authentication, integrity and confidentiality checking
+   * auth-int: authentication and integrity checking
+   * auth: authentication only
+   *
+   * This is used to authenticate the callers and support impersonation.
+   * The thrift server and the HBase cluster must run in secure mode.
+   */
+  public static final String THRIFT_QOP_KEY = "hbase.thrift.security.qop";
+
+  public static final String BACKLOG_CONF_KEY = "hbase.regionserver.thrift.backlog";
+  public static final int BACKLOG_CONF_DEAFULT = 0;
+
+  public static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
+  public static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+
+  public static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+  public static final int DEFAULT_LISTEN_PORT = 9090;
+
+  public static final String THRIFT_HTTP_ALLOW_OPTIONS_METHOD =
+      "hbase.thrift.http.allow.options.method";
+  public static final boolean THRIFT_HTTP_ALLOW_OPTIONS_METHOD_DEFAULT = false;
+
+  public static final String THRIFT_INFO_SERVER_PORT = "hbase.thrift.info.port";
+  public static final int THRIFT_INFO_SERVER_PORT_DEFAULT = 9095;
+
+  public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS = "hbase.thrift.info.bindAddress";
+  public static final String THRIFT_INFO_SERVER_BINDING_ADDRESS_DEFAULT = "0.0.0.0";
+
+  public static final String THRIFT_QUEUE_SIZE = "hbase.thrift.queue.size";
+  public static final int THRIFT_QUEUE_SIZE_DEFAULT = Integer.MAX_VALUE;
+
+  public static final String THRIFT_SELECTOR_NUM = "hbase.thrift.selector.num";
+
+  public static final String THRIFT_FILTERS = "hbase.thrift.filters";
+
+  // Command line options
+
+  public static final String READ_TIMEOUT_OPTION = "readTimeout";
+  public static final String MIN_WORKERS_OPTION = "minWorkers";
+  public static final String MAX_WORKERS_OPTION = "workers";
+  public static final String MAX_QUEUE_SIZE_OPTION = "queue";
+  public static final String SELECTOR_NUM_OPTION = "selectors";
+  public static final String KEEP_ALIVE_SEC_OPTION = "keepAliveSec";
+  public static final String BIND_OPTION = "bind";
+  public static final String COMPACT_OPTION = "compact";
+  public static final String FRAMED_OPTION = "framed";
+  public static final String PORT_OPTION = "port";
+  public static final String INFOPORT_OPTION = "infoport";
+
+  //for thrift2 server
+  public static final String READONLY_OPTION ="readonly";
+
+  public static final String THRIFT_READONLY_ENABLED = "hbase.thrift.readonly";
+  public static final boolean THRIFT_READONLY_ENABLED_DEFAULT = false;
+
+
+
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
new file mode 100644
index 0000000..7990871
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HBaseServiceHandler.java
@@ -0,0 +1,90 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConnectionCache;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * abstract class for HBase handler
+ * providing a Connection cache and get table/admin method
+ */
+@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS)
+public abstract class HBaseServiceHandler {
+  public static final String CLEANUP_INTERVAL = "hbase.thrift.connection.cleanup-interval";
+  public static final String MAX_IDLETIME = "hbase.thrift.connection.max-idletime";
+
+  protected Configuration conf;
+
+  protected final ConnectionCache connectionCache;
+
+  public HBaseServiceHandler(final Configuration c,
+      final UserProvider userProvider) throws IOException {
+    this.conf = c;
+    int cleanInterval = conf.getInt(CLEANUP_INTERVAL, 10 * 1000);
+    int maxIdleTime = conf.getInt(MAX_IDLETIME, 10 * 60 * 1000);
+    connectionCache = new ConnectionCache(
+        conf, userProvider, cleanInterval, maxIdleTime);
+  }
+
+  protected ThriftMetrics metrics = null;
+
+  public void initMetrics(ThriftMetrics metrics) {
+    this.metrics = metrics;
+  }
+
+  public void setEffectiveUser(String effectiveUser) {
+    connectionCache.setEffectiveUser(effectiveUser);
+  }
+
+  /**
+   * Obtain HBaseAdmin. Creates the instance if it is not already created.
+   */
+  protected Admin getAdmin() throws IOException {
+    return connectionCache.getAdmin();
+  }
+
+  /**
+   * Creates and returns a Table instance from a given table name.
+   *
+   * @param tableName
+   *          name of table
+   * @return Table object
+   * @throws IOException if getting the table fails
+   */
+  protected Table getTable(final byte[] tableName) throws IOException {
+    String table = Bytes.toString(tableName);
+    return connectionCache.getTable(table);
+  }
+
+  protected Table getTable(final ByteBuffer tableName) throws IOException {
+    return getTable(Bytes.getBytes(tableName));
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
index 5a6e436..1402f86 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/HbaseHandlerMetricsProxy.java
@@ -25,9 +25,8 @@ import java.lang.reflect.Proxy;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift2.generated.THBaseService;
 import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 /**
  * Converts a Hbase.Iface using InvocationHandler so that it reports process
@@ -36,10 +35,7 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public final class HbaseHandlerMetricsProxy implements InvocationHandler {
 
-  private static final Logger LOG = LoggerFactory.getLogger(
-      HbaseHandlerMetricsProxy.class);
-
-  private final Hbase.Iface handler;
+  private final Object handler;
   private final ThriftMetrics metrics;
 
   public static Hbase.Iface newInstance(Hbase.Iface handler,
@@ -51,8 +47,18 @@ public final class HbaseHandlerMetricsProxy implements InvocationHandler {
         new HbaseHandlerMetricsProxy(handler, metrics, conf));
   }
 
+  // for thrift 2
+  public static THBaseService.Iface newInstance(THBaseService.Iface handler,
+      ThriftMetrics metrics,
+      Configuration conf) {
+    return (THBaseService.Iface) Proxy.newProxyInstance(
+        handler.getClass().getClassLoader(),
+        new Class[]{THBaseService.Iface.class},
+        new HbaseHandlerMetricsProxy(handler, metrics, conf));
+  }
+
   private HbaseHandlerMetricsProxy(
-      Hbase.Iface handler, ThriftMetrics metrics, Configuration conf) {
+      Object handler, ThriftMetrics metrics, Configuration conf) {
     this.handler = handler;
     this.metrics = metrics;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java
new file mode 100644
index 0000000..7108115
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ImplType.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.thrift.Constants.SERVER_TYPE_CONF_KEY;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.Option;
+import org.apache.hbase.thirdparty.org.apache.commons.cli.OptionGroup;
+
+/** An enum of server implementation selections */
+@InterfaceAudience.Private
+public enum ImplType {
+  HS_HA("hsha", true, THsHaServer.class, true),
+  NONBLOCKING("nonblocking", true, TNonblockingServer.class, true),
+  THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
+  THREADED_SELECTOR("threadedselector", true, TThreadedSelectorServer.class, true);
+
+  private static final Logger LOG = LoggerFactory.getLogger(ImplType.class);
+  public static final ImplType DEFAULT = THREAD_POOL;
+
+
+  final String option;
+  final boolean isAlwaysFramed;
+  final Class<? extends TServer> serverClass;
+  final boolean canSpecifyBindIP;
+
+  private ImplType(String option, boolean isAlwaysFramed,
+      Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
+    this.option = option;
+    this.isAlwaysFramed = isAlwaysFramed;
+    this.serverClass = serverClass;
+    this.canSpecifyBindIP = canSpecifyBindIP;
+  }
+
+  /**
+   * @return <code>-option</code>
+   */
+  @Override
+  public String toString() {
+    return "-" + option;
+  }
+
+  public String getOption() {
+    return option;
+  }
+
+  public boolean isAlwaysFramed() {
+    return isAlwaysFramed;
+  }
+
+  public String getDescription() {
+    StringBuilder sb = new StringBuilder("Use the " +
+        serverClass.getSimpleName());
+    if (isAlwaysFramed) {
+      sb.append(" This implies the framed transport.");
+    }
+    if (this == DEFAULT) {
+      sb.append("This is the default.");
+    }
+    return sb.toString();
+  }
+
+  static OptionGroup createOptionGroup() {
+    OptionGroup group = new OptionGroup();
+    for (ImplType t : values()) {
+      group.addOption(new Option(t.option, t.getDescription()));
+    }
+    return group;
+  }
+
+  public static ImplType getServerImpl(Configuration conf) {
+    String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
+    for (ImplType t : values()) {
+      if (confType.equals(t.option)) {
+        return t;
+      }
+    }
+    throw new AssertionError("Unknown server ImplType.option:" + confType);
+  }
+
+  static void setServerImpl(CommandLine cmd, Configuration conf) {
+    ImplType chosenType = null;
+    int numChosen = 0;
+    for (ImplType t : values()) {
+      if (cmd.hasOption(t.option)) {
+        chosenType = t;
+        ++numChosen;
+      }
+    }
+    if (numChosen < 1) {
+      LOG.info("Using default thrift server type");
+      chosenType = DEFAULT;
+    } else if (numChosen > 1) {
+      throw new AssertionError("Exactly one option out of " +
+          Arrays.toString(values()) + " has to be specified");
+    }
+    LOG.info("Using thrift server type " + chosenType.option);
+    conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
+  }
+
+  public String simpleClassName() {
+    return serverClass.getSimpleName();
+  }
+
+  public static List<String> serversThatCannotSpecifyBindIP() {
+    List<String> l = new ArrayList<>();
+    for (ImplType t : values()) {
+      if (!t.canSpecifyBindIP) {
+        l.add(t.simpleClassName());
+      }
+    }
+    return l;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index e36d639..971cd17 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.LongAdder;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.thrift.ThriftServerRunner.HBaseHandler;
 import org.apache.hadoop.hbase.thrift.generated.TIncrement;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -180,7 +179,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
   private final ConcurrentMap<FullyQualifiedRow, Long> countersMap =
       new ConcurrentHashMap<>(100000, 0.75f, 1500);
   private final ThreadPoolExecutor pool;
-  private final HBaseHandler handler;
+  private final ThriftHBaseServiceHandler handler;
 
   private int maxQueueSize = 500000;
   private static final int CORE_POOL_SIZE = 1;
@@ -188,7 +187,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
   private static final Logger LOG = LoggerFactory.getLogger(FullyQualifiedRow.class);
 
   @SuppressWarnings("deprecation")
-  public IncrementCoalescer(HBaseHandler hand) {
+  public IncrementCoalescer(ThriftHBaseServiceHandler hand) {
     this.handler = hand;
     LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
     pool =
@@ -230,6 +229,7 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
       inc.getAmmount());
   }
 
+  @SuppressWarnings("FutureReturnValueIgnored")
   private boolean internalQueueIncrement(byte[] tableName, byte[] rowKey, byte[] fam,
       byte[] qual, long ammount) throws TException {
     int countersMapSize = countersMap.size();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
new file mode 100644
index 0000000..34bf5e8
--- /dev/null
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHBaseServiceHandler.java
@@ -0,0 +1,1347 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.thrift.Constants.COALESCE_INC_KEY;
+import static org.apache.hadoop.hbase.util.Bytes.getBytes;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellBuilder;
+import org.apache.hadoop.hbase.CellBuilderFactory;
+import org.apache.hadoop.hbase.CellBuilderType;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Increment;
+import org.apache.hadoop.hbase.client.OperationWithAttributes;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TAppend;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TIncrement;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.generated.TScan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.thrift.TException;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * The HBaseServiceHandler is a glue object that connects Thrift RPC calls to the
+ * HBase client API primarily defined in the Admin and Table objects.
+ */
+@InterfaceAudience.Private
+@SuppressWarnings("deprecation")
+public class ThriftHBaseServiceHandler extends HBaseServiceHandler implements Hbase.Iface {
+  private static final Logger LOG = LoggerFactory.getLogger(ThriftHBaseServiceHandler.class);
+
+  public static final int HREGION_VERSION = 1;
+
+  // nextScannerId and scannerMap are used to manage scanner state
+  private int nextScannerId = 0;
+  private HashMap<Integer, ResultScannerWrapper> scannerMap;
+  IncrementCoalescer coalescer;
+
+  /**
+   * Returns a list of all the column families for a given Table.
+   *
+   * @param table table
+   * @throws IOException
+   */
+  byte[][] getAllColumns(Table table) throws IOException {
+    HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
+    byte[][] columns = new byte[cds.length][];
+    for (int i = 0; i < cds.length; i++) {
+      columns[i] = Bytes.add(cds[i].getName(),
+          KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
+    }
+    return columns;
+  }
+
+
+  /**
+   * Assigns a unique ID to the scanner and adds the mapping to an internal
+   * hash-map.
+   *
+   * @param scanner the {@link ResultScanner} to add
+   * @return integer scanner id
+   */
+  protected synchronized int addScanner(ResultScanner scanner, boolean sortColumns) {
+    int id = nextScannerId++;
+    ResultScannerWrapper resultScannerWrapper =
+        new ResultScannerWrapper(scanner, sortColumns);
+    scannerMap.put(id, resultScannerWrapper);
+    return id;
+  }
+
+  /**
+   * Returns the scanner associated with the specified ID.
+   *
+   * @param id the ID of the scanner to get
+   * @return a Scanner, or null if ID was invalid.
+   */
+  private synchronized ResultScannerWrapper getScanner(int id) {
+    return scannerMap.get(id);
+  }
+
+  /**
+   * Removes the scanner associated with the specified ID from the internal
+   * id-&gt;scanner hash-map.
+   *
+   * @param id the ID of the scanner to remove
+   * @return a Scanner, or null if ID was invalid.
+   */
+  private synchronized ResultScannerWrapper removeScanner(int id) {
+    return scannerMap.remove(id);
+  }
+
+  protected ThriftHBaseServiceHandler(final Configuration c,
+      final UserProvider userProvider) throws IOException {
+    super(c, userProvider);
+    scannerMap = new HashMap<>();
+    this.coalescer = new IncrementCoalescer(this);
+  }
+
+
+  @Override
+  public void enableTable(ByteBuffer tableName) throws IOError {
+    try{
+      getAdmin().enableTable(getTableName(tableName));
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public void disableTable(ByteBuffer tableName) throws IOError{
+    try{
+      getAdmin().disableTable(getTableName(tableName));
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
+    try {
+      return this.connectionCache.getAdmin().isTableEnabled(getTableName(tableName));
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  // ThriftServerRunner.compact should be deprecated and replaced with methods specific to
+  // table and region.
+  @Override
+  public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
+    try {
+      try {
+        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
+      } catch (IllegalArgumentException e) {
+        // Invalid region, try table
+        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  // ThriftServerRunner.majorCompact should be deprecated and replaced with methods specific
+  // to table and region.
+  @Override
+  public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
+    try {
+      try {
+        getAdmin().compactRegion(getBytes(tableNameOrRegionName));
+      } catch (IllegalArgumentException e) {
+        // Invalid region, try table
+        getAdmin().compact(TableName.valueOf(getBytes(tableNameOrRegionName)));
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public List<ByteBuffer> getTableNames() throws IOError {
+    try {
+      TableName[] tableNames = this.getAdmin().listTableNames();
+      ArrayList<ByteBuffer> list = new ArrayList<>(tableNames.length);
+      for (TableName tableName : tableNames) {
+        list.add(ByteBuffer.wrap(tableName.getName()));
+      }
+      return list;
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  /**
+   * @return the list of regions in the given table, or an empty list if the table does not exist
+   */
+  @Override
+  public List<TRegionInfo> getTableRegions(ByteBuffer tableName) throws IOError {
+    try (RegionLocator locator = connectionCache.getRegionLocator(getBytes(tableName))) {
+      List<HRegionLocation> regionLocations = locator.getAllRegionLocations();
+      List<TRegionInfo> results = new ArrayList<>(regionLocations.size());
+      for (HRegionLocation regionLocation : regionLocations) {
+        RegionInfo info = regionLocation.getRegionInfo();
+        ServerName serverName = regionLocation.getServerName();
+        TRegionInfo region = new TRegionInfo();
+        region.serverName = ByteBuffer.wrap(
+            Bytes.toBytes(serverName.getHostname()));
+        region.port = serverName.getPort();
+        region.startKey = ByteBuffer.wrap(info.getStartKey());
+        region.endKey = ByteBuffer.wrap(info.getEndKey());
+        region.id = info.getRegionId();
+        region.name = ByteBuffer.wrap(info.getRegionName());
+        region.version = HREGION_VERSION; // HRegion now not versioned, PB encoding used
+        results.add(region);
+      }
+      return results;
+    } catch (TableNotFoundException e) {
+      // Return empty list for non-existing table
+      return Collections.emptyList();
+    } catch (IOException e){
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public List<TCell> get(
+      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError {
+    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+    if (famAndQf.length == 1) {
+      return get(tableName, row, famAndQf[0], null, attributes);
+    }
+    if (famAndQf.length == 2) {
+      return get(tableName, row, famAndQf[0], famAndQf[1], attributes);
+    }
+    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+  }
+
+  /**
+   * Note: this internal interface is slightly different from public APIs in regard to handling
+   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
+   * we respect qual == null as a request for the entire column family. The caller (
+   * {@link #get(ByteBuffer, ByteBuffer, ByteBuffer, Map)}) interface IS consistent in that the
+   * column is parse like normal.
+   */
+  protected List<TCell> get(ByteBuffer tableName,
+      ByteBuffer row,
+      byte[] family,
+      byte[] qualifier,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Get get = new Get(getBytes(row));
+      addAttributes(get, attributes);
+      if (qualifier == null) {
+        get.addFamily(family);
+      } else {
+        get.addColumn(family, qualifier);
+      }
+      Result result = table.get(get);
+      return ThriftUtilities.cellFromHBase(result.rawCells());
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+      int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+    if(famAndQf.length == 1) {
+      return getVer(tableName, row, famAndQf[0], null, numVersions, attributes);
+    }
+    if (famAndQf.length == 2) {
+      return getVer(tableName, row, famAndQf[0], famAndQf[1], numVersions, attributes);
+    }
+    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+
+  }
+
+  /**
+   * Note: this public interface is slightly different from public Java APIs in regard to
+   * handling of the qualifier. Here we differ from the public Java API in that null != byte[0].
+   * Rather, we respect qual == null as a request for the entire column family. If you want to
+   * access the entire column family, use
+   * {@link #getVer(ByteBuffer, ByteBuffer, ByteBuffer, int, Map)} with a {@code column} value
+   * that lacks a {@code ':'}.
+   */
+  public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row, byte[] family,
+      byte[] qualifier, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Get get = new Get(getBytes(row));
+      addAttributes(get, attributes);
+      if (null == qualifier) {
+        get.addFamily(family);
+      } else {
+        get.addColumn(family, qualifier);
+      }
+      get.setMaxVersions(numVersions);
+      Result result = table.get(get);
+      return ThriftUtilities.cellFromHBase(result.rawCells());
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+      long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+    if (famAndQf.length == 1) {
+      return getVerTs(tableName, row, famAndQf[0], null, timestamp, numVersions, attributes);
+    }
+    if (famAndQf.length == 2) {
+      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp, numVersions,
+          attributes);
+    }
+    throw new IllegalArgumentException("Invalid familyAndQualifier provided.");
+  }
+
+  /**
+   * Note: this internal interface is slightly different from public APIs in regard to handling
+   * of the qualifier. Here we differ from the public Java API in that null != byte[0]. Rather,
+   * we respect qual == null as a request for the entire column family. The caller (
+   * {@link #getVerTs(ByteBuffer, ByteBuffer, ByteBuffer, long, int, Map)}) interface IS
+   * consistent in that the column is parse like normal.
+   */
+  protected List<TCell> getVerTs(ByteBuffer tableName, ByteBuffer row, byte[] family,
+      byte[] qualifier, long timestamp, int numVersions, Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Get get = new Get(getBytes(row));
+      addAttributes(get, attributes);
+      if (null == qualifier) {
+        get.addFamily(family);
+      } else {
+        get.addColumn(family, qualifier);
+      }
+      get.setTimeRange(0, timestamp);
+      get.setMaxVersions(numVersions);
+      Result result = table.get(get);
+      return ThriftUtilities.cellFromHBase(result.rawCells());
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    return getRowWithColumnsTs(tableName, row, null,
+        HConstants.LATEST_TIMESTAMP,
+        attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
+      ByteBuffer row,
+      List<ByteBuffer> columns,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    return getRowWithColumnsTs(tableName, row, columns,
+        HConstants.LATEST_TIMESTAMP,
+        attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
+      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    return getRowWithColumnsTs(tableName, row, null,
+        timestamp, attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowWithColumnsTs(
+      ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
+      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      if (columns == null) {
+        Get get = new Get(getBytes(row));
+        addAttributes(get, attributes);
+        get.setTimeRange(0, timestamp);
+        Result result = table.get(get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      }
+      Get get = new Get(getBytes(row));
+      addAttributes(get, attributes);
+      for(ByteBuffer column : columns) {
+        byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+        if (famAndQf.length == 1) {
+          get.addFamily(famAndQf[0]);
+        } else {
+          get.addColumn(famAndQf[0], famAndQf[1]);
+        }
+      }
+      get.setTimeRange(0, timestamp);
+      Result result = table.get(get);
+      return ThriftUtilities.rowResultFromHBase(result);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public List<TRowResult> getRows(ByteBuffer tableName,
+      List<ByteBuffer> rows,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError {
+    return getRowsWithColumnsTs(tableName, rows, null,
+        HConstants.LATEST_TIMESTAMP,
+        attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
+      List<ByteBuffer> rows,
+      List<ByteBuffer> columns,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    return getRowsWithColumnsTs(tableName, rows, columns,
+        HConstants.LATEST_TIMESTAMP,
+        attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowsTs(ByteBuffer tableName,
+      List<ByteBuffer> rows,
+      long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    return getRowsWithColumnsTs(tableName, rows, null,
+        timestamp, attributes);
+  }
+
+  @Override
+  public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
+      List<ByteBuffer> rows,
+      List<ByteBuffer> columns, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+    Table table= null;
+    try {
+      List<Get> gets = new ArrayList<>(rows.size());
+      table = getTable(tableName);
+      if (metrics != null) {
+        metrics.incNumRowKeysInBatchGet(rows.size());
+      }
+      for (ByteBuffer row : rows) {
+        Get get = new Get(getBytes(row));
+        addAttributes(get, attributes);
+        if (columns != null) {
+
+          for(ByteBuffer column : columns) {
+            byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+            if (famAndQf.length == 1) {
+              get.addFamily(famAndQf[0]);
+            } else {
+              get.addColumn(famAndQf[0], famAndQf[1]);
+            }
+          }
+        }
+        get.setTimeRange(0, timestamp);
+        gets.add(get);
+      }
+      Result[] result = table.get(gets);
+      return ThriftUtilities.rowResultFromHBase(result);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void deleteAll(
+      ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError {
+    deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP,
+        attributes);
+  }
+
+  @Override
+  public void deleteAllTs(ByteBuffer tableName,
+      ByteBuffer row,
+      ByteBuffer column,
+      long timestamp, Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Delete delete  = new Delete(getBytes(row));
+      addAttributes(delete, attributes);
+      byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+      if (famAndQf.length == 1) {
+        delete.addFamily(famAndQf[0], timestamp);
+      } else {
+        delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+      }
+      table.delete(delete);
+
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void deleteAllRow(
+      ByteBuffer tableName, ByteBuffer row,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP, attributes);
+  }
+
+  @Override
+  public void deleteAllRowTs(
+      ByteBuffer tableName, ByteBuffer row, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Delete delete  = new Delete(getBytes(row), timestamp);
+      addAttributes(delete, attributes);
+      table.delete(delete);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void createTable(ByteBuffer in_tableName,
+      List<ColumnDescriptor> columnFamilies) throws IOError, IllegalArgument, AlreadyExists {
+    TableName tableName = getTableName(in_tableName);
+    try {
+      if (getAdmin().tableExists(tableName)) {
+        throw new AlreadyExists("table name already in use");
+      }
+      HTableDescriptor desc = new HTableDescriptor(tableName);
+      for (ColumnDescriptor col : columnFamilies) {
+        HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
+        desc.addFamily(colDesc);
+      }
+      getAdmin().createTable(desc);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } catch (IllegalArgumentException e) {
+      LOG.warn(e.getMessage(), e);
+      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+    }
+  }
+
+  private static TableName getTableName(ByteBuffer buffer) {
+    return TableName.valueOf(getBytes(buffer));
+  }
+
+  @Override
+  public void deleteTable(ByteBuffer in_tableName) throws IOError {
+    TableName tableName = getTableName(in_tableName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("deleteTable: table={}", tableName);
+    }
+    try {
+      if (!getAdmin().tableExists(tableName)) {
+        throw new IOException("table does not exist");
+      }
+      getAdmin().deleteTable(tableName);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public void mutateRow(ByteBuffer tableName, ByteBuffer row,
+      List<Mutation> mutations, Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, IllegalArgument {
+    mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP, attributes);
+  }
+
+  @Override
+  public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+      List<Mutation> mutations, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, IllegalArgument {
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Put put = new Put(getBytes(row), timestamp);
+      addAttributes(put, attributes);
+
+      Delete delete = new Delete(getBytes(row));
+      addAttributes(delete, attributes);
+      if (metrics != null) {
+        metrics.incNumRowKeysInBatchMutate(mutations.size());
+      }
+
+      // I apologize for all this mess :)
+      CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+      for (Mutation m : mutations) {
+        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
+        if (m.isDelete) {
+          if (famAndQf.length == 1) {
+            delete.addFamily(famAndQf[0], timestamp);
+          } else {
+            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+          }
+          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+        } else {
+          if(famAndQf.length == 1) {
+            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
+                + "over the whole column family.");
+          } else {
+            put.add(builder.clear()
+                .setRow(put.getRow())
+                .setFamily(famAndQf[0])
+                .setQualifier(famAndQf[1])
+                .setTimestamp(put.getTimestamp())
+                .setType(Cell.Type.Put)
+                .setValue(m.value != null ? getBytes(m.value)
+                    : HConstants.EMPTY_BYTE_ARRAY)
+                .build());
+          }
+          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+        }
+      }
+      if (!delete.isEmpty()) {
+        table.delete(delete);
+      }
+      if (!put.isEmpty()) {
+        table.put(put);
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } catch (IllegalArgumentException e) {
+      LOG.warn(e.getMessage(), e);
+      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, IllegalArgument, TException {
+    mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP, attributes);
+  }
+
+  @Override
+  public void mutateRowsTs(
+      ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, IllegalArgument, TException {
+    List<Put> puts = new ArrayList<>();
+    List<Delete> deletes = new ArrayList<>();
+    CellBuilder builder = CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY);
+    for (BatchMutation batch : rowBatches) {
+      byte[] row = getBytes(batch.row);
+      List<Mutation> mutations = batch.mutations;
+      Delete delete = new Delete(row);
+      addAttributes(delete, attributes);
+      Put put = new Put(row, timestamp);
+      addAttributes(put, attributes);
+      for (Mutation m : mutations) {
+        byte[][] famAndQf = CellUtil.parseColumn(getBytes(m.column));
+        if (m.isDelete) {
+          // no qualifier, family only.
+          if (famAndQf.length == 1) {
+            delete.addFamily(famAndQf[0], timestamp);
+          } else {
+            delete.addColumns(famAndQf[0], famAndQf[1], timestamp);
+          }
+          delete.setDurability(m.writeToWAL ? Durability.SYNC_WAL
+              : Durability.SKIP_WAL);
+        } else {
+          if (famAndQf.length == 1) {
+            LOG.warn("No column qualifier specified. Delete is the only mutation supported "
+                + "over the whole column family.");
+          }
+          if (famAndQf.length == 2) {
+            try {
+              put.add(builder.clear()
+                  .setRow(put.getRow())
+                  .setFamily(famAndQf[0])
+                  .setQualifier(famAndQf[1])
+                  .setTimestamp(put.getTimestamp())
+                  .setType(Cell.Type.Put)
+                  .setValue(m.value != null ? getBytes(m.value)
+                      : HConstants.EMPTY_BYTE_ARRAY)
+                  .build());
+            } catch (IOException e) {
+              throw new IllegalArgumentException(e);
+            }
+          } else {
+            throw new IllegalArgumentException("Invalid famAndQf provided.");
+          }
+          put.setDurability(m.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+        }
+      }
+      if (!delete.isEmpty()) {
+        deletes.add(delete);
+      }
+      if (!put.isEmpty()) {
+        puts.add(put);
+      }
+    }
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      if (!puts.isEmpty()) {
+        table.put(puts);
+      }
+      if (!deletes.isEmpty()) {
+        table.delete(deletes);
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } catch (IllegalArgumentException e) {
+      LOG.warn(e.getMessage(), e);
+      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public long atomicIncrement(
+      ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
+      throws IOError, IllegalArgument, TException {
+    byte [][] famAndQf = CellUtil.parseColumn(getBytes(column));
+    if(famAndQf.length == 1) {
+      return atomicIncrement(tableName, row, famAndQf[0], HConstants.EMPTY_BYTE_ARRAY, amount);
+    }
+    return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
+  }
+
+  protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
+      byte [] family, byte [] qualifier, long amount)
+      throws IOError, IllegalArgument, TException {
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      return table.incrementColumnValue(
+          getBytes(row), family, qualifier, amount);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void scannerClose(int id) throws IOError, IllegalArgument {
+    LOG.debug("scannerClose: id={}", id);
+    ResultScannerWrapper resultScannerWrapper = getScanner(id);
+    if (resultScannerWrapper == null) {
+      LOG.warn("scanner ID is invalid");
+      throw new IllegalArgument("scanner ID is invalid");
+    }
+    resultScannerWrapper.getScanner().close();
+    removeScanner(id);
+  }
+
+  @Override
+  public List<TRowResult> scannerGetList(int id,int nbRows)
+      throws IllegalArgument, IOError {
+    LOG.debug("scannerGetList: id={}", id);
+    ResultScannerWrapper resultScannerWrapper = getScanner(id);
+    if (null == resultScannerWrapper) {
+      String message = "scanner ID is invalid";
+      LOG.warn(message);
+      throw new IllegalArgument("scanner ID is invalid");
+    }
+
+    Result [] results;
+    try {
+      results = resultScannerWrapper.getScanner().next(nbRows);
+      if (null == results) {
+        return new ArrayList<>();
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+    return ThriftUtilities.rowResultFromHBase(results, resultScannerWrapper.isColumnSorted());
+  }
+
+  @Override
+  public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
+    return scannerGetList(id,1);
+  }
+
+  @Override
+  public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan();
+      addAttributes(scan, attributes);
+      if (tScan.isSetStartRow()) {
+        scan.setStartRow(tScan.getStartRow());
+      }
+      if (tScan.isSetStopRow()) {
+        scan.setStopRow(tScan.getStopRow());
+      }
+      if (tScan.isSetTimestamp()) {
+        scan.setTimeRange(0, tScan.getTimestamp());
+      }
+      if (tScan.isSetCaching()) {
+        scan.setCaching(tScan.getCaching());
+      }
+      if (tScan.isSetBatchSize()) {
+        scan.setBatch(tScan.getBatchSize());
+      }
+      if (tScan.isSetColumns() && !tScan.getColumns().isEmpty()) {
+        for(ByteBuffer column : tScan.getColumns()) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      if (tScan.isSetFilterString()) {
+        ParseFilter parseFilter = new ParseFilter();
+        scan.setFilter(
+            parseFilter.parseFilterString(tScan.getFilterString()));
+      }
+      if (tScan.isSetReversed()) {
+        scan.setReversed(tScan.isReversed());
+      }
+      if (tScan.isSetCacheBlocks()) {
+        scan.setCacheBlocks(tScan.isCacheBlocks());
+      }
+      return addScanner(table.getScanner(scan), tScan.sortColumns);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
+      List<ByteBuffer> columns,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan(getBytes(startRow));
+      addAttributes(scan, attributes);
+      if(columns != null && !columns.isEmpty()) {
+        for(ByteBuffer column : columns) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      return addScanner(table.getScanner(scan), false);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
+      ByteBuffer stopRow, List<ByteBuffer> columns,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, TException {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+      addAttributes(scan, attributes);
+      if(columns != null && !columns.isEmpty()) {
+        for(ByteBuffer column : columns) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      return addScanner(table.getScanner(scan), false);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public int scannerOpenWithPrefix(ByteBuffer tableName,
+      ByteBuffer startAndPrefix,
+      List<ByteBuffer> columns,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, TException {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan(getBytes(startAndPrefix));
+      addAttributes(scan, attributes);
+      Filter f = new WhileMatchFilter(
+          new PrefixFilter(getBytes(startAndPrefix)));
+      scan.setFilter(f);
+      if (columns != null && !columns.isEmpty()) {
+        for(ByteBuffer column : columns) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      return addScanner(table.getScanner(scan), false);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
+      List<ByteBuffer> columns, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes) throws IOError, TException {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan(getBytes(startRow));
+      addAttributes(scan, attributes);
+      scan.setTimeRange(0, timestamp);
+      if (columns != null && !columns.isEmpty()) {
+        for (ByteBuffer column : columns) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      return addScanner(table.getScanner(scan), false);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
+      ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp,
+      Map<ByteBuffer, ByteBuffer> attributes)
+      throws IOError, TException {
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+      addAttributes(scan, attributes);
+      scan.setTimeRange(0, timestamp);
+      if (columns != null && !columns.isEmpty()) {
+        for (ByteBuffer column : columns) {
+          byte [][] famQf = CellUtil.parseColumn(getBytes(column));
+          if(famQf.length == 1) {
+            scan.addFamily(famQf[0]);
+          } else {
+            scan.addColumn(famQf[0], famQf[1]);
+          }
+        }
+      }
+      scan.setTimeRange(0, timestamp);
+      return addScanner(table.getScanner(scan), false);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
+      ByteBuffer tableName) throws IOError, TException {
+
+    Table table = null;
+    try {
+      TreeMap<ByteBuffer, ColumnDescriptor> columns = new TreeMap<>();
+
+      table = getTable(tableName);
+      HTableDescriptor desc = table.getTableDescriptor();
+
+      for (HColumnDescriptor e : desc.getFamilies()) {
+        ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
+        columns.put(col.name, col);
+      }
+      return columns;
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  private void closeTable(Table table) throws IOError {
+    try{
+      if(table != null){
+        table.close();
+      }
+    } catch (IOException e){
+      LOG.error(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  @Override
+  public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+    try {
+      byte[] row = getBytes(searchRow);
+      Result startRowResult = getReverseScanResult(TableName.META_TABLE_NAME.getName(), row,
+          HConstants.CATALOG_FAMILY);
+
+      if (startRowResult == null) {
+        throw new IOException("Cannot find row in "+ TableName.META_TABLE_NAME+", row="
+            + Bytes.toStringBinary(row));
+      }
+
+      // find region start and end keys
+      RegionInfo regionInfo = MetaTableAccessor.getRegionInfo(startRowResult);
+      if (regionInfo == null) {
+        throw new IOException("RegionInfo REGIONINFO was null or " +
+            " empty in Meta for row="
+            + Bytes.toStringBinary(row));
+      }
+      TRegionInfo region = new TRegionInfo();
+      region.setStartKey(regionInfo.getStartKey());
+      region.setEndKey(regionInfo.getEndKey());
+      region.id = regionInfo.getRegionId();
+      region.setName(regionInfo.getRegionName());
+      region.version = HREGION_VERSION; // version not used anymore, PB encoding used.
+
+      // find region assignment to server
+      ServerName serverName = MetaTableAccessor.getServerName(startRowResult, 0);
+      if (serverName != null) {
+        region.setServerName(Bytes.toBytes(serverName.getHostname()));
+        region.port = serverName.getPort();
+      }
+      return region;
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    }
+  }
+
+  private Result getReverseScanResult(byte[] tableName, byte[] row, byte[] family)
+      throws IOException {
+    Scan scan = new Scan(row);
+    scan.setReversed(true);
+    scan.addFamily(family);
+    scan.setStartRow(row);
+    try (Table table = getTable(tableName);
+         ResultScanner scanner = table.getScanner(scan)) {
+      return scanner.next();
+    }
+  }
+
+  @Override
+  public void increment(TIncrement tincrement) throws IOError, TException {
+
+    if (tincrement.getRow().length == 0 || tincrement.getTable().length == 0) {
+      throw new TException("Must supply a table and a row key; can't increment");
+    }
+
+    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+      this.coalescer.queueIncrement(tincrement);
+      return;
+    }
+
+    Table table = null;
+    try {
+      table = getTable(tincrement.getTable());
+      Increment inc = ThriftUtilities.incrementFromThrift(tincrement);
+      table.increment(inc);
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public void incrementRows(List<TIncrement> tincrements) throws IOError, TException {
+    if (conf.getBoolean(COALESCE_INC_KEY, false)) {
+      this.coalescer.queueIncrements(tincrements);
+      return;
+    }
+    for (TIncrement tinc : tincrements) {
+      increment(tinc);
+    }
+  }
+
+  @Override
+  public List<TCell> append(TAppend tappend) throws IOError, TException {
+    if (tappend.getRow().length == 0 || tappend.getTable().length == 0) {
+      throw new TException("Must supply a table and a row key; can't append");
+    }
+
+    Table table = null;
+    try {
+      table = getTable(tappend.getTable());
+      Append append = ThriftUtilities.appendFromThrift(tappend);
+      Result result = table.append(append);
+      return ThriftUtilities.cellFromHBase(result.rawCells());
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } finally{
+      closeTable(table);
+    }
+  }
+
+  @Override
+  public boolean checkAndPut(ByteBuffer tableName, ByteBuffer row, ByteBuffer column,
+      ByteBuffer value, Mutation mput, Map<ByteBuffer, ByteBuffer> attributes) throws IOError,
+      IllegalArgument, TException {
+    Put put;
+    try {
+      put = new Put(getBytes(row), HConstants.LATEST_TIMESTAMP);
+      addAttributes(put, attributes);
+
+      byte[][] famAndQf = CellUtil.parseColumn(getBytes(mput.column));
+      put.add(CellBuilderFactory.create(CellBuilderType.SHALLOW_COPY)
+          .setRow(put.getRow())
+          .setFamily(famAndQf[0])
+          .setQualifier(famAndQf[1])
+          .setTimestamp(put.getTimestamp())
+          .setType(Cell.Type.Put)
+          .setValue(mput.value != null ? getBytes(mput.value)
+              : HConstants.EMPTY_BYTE_ARRAY)
+          .build());
+      put.setDurability(mput.writeToWAL ? Durability.SYNC_WAL : Durability.SKIP_WAL);
+    } catch (IOException | IllegalArgumentException e) {
+      LOG.warn(e.getMessage(), e);
+      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+    }
+
+    Table table = null;
+    try {
+      table = getTable(tableName);
+      byte[][] famAndQf = CellUtil.parseColumn(getBytes(column));
+      Table.CheckAndMutateBuilder mutateBuilder =
+          table.checkAndMutate(getBytes(row), famAndQf[0]).qualifier(famAndQf[1]);
+      if (value != null) {
+        return mutateBuilder.ifEquals(getBytes(value)).thenPut(put);
+      } else {
+        return mutateBuilder.ifNotExists().thenPut(put);
+      }
+    } catch (IOException e) {
+      LOG.warn(e.getMessage(), e);
+      throw getIOError(e);
+    } catch (IllegalArgumentException e) {
+      LOG.warn(e.getMessage(), e);
+      throw new IllegalArgument(Throwables.getStackTraceAsString(e));
+    } finally {
+      closeTable(table);
+    }
+  }
+
+  private static IOError getIOError(Throwable throwable) {
+    IOError error = new IOErrorWithCause(throwable);
+    error.setMessage(Throwables.getStackTraceAsString(throwable));
+    return error;
+  }
+
+  /**
+   * Adds all the attributes into the Operation object
+   */
+  private static void addAttributes(OperationWithAttributes op,
+      Map<ByteBuffer, ByteBuffer> attributes) {
+    if (attributes == null || attributes.isEmpty()) {
+      return;
+    }
+    for (Map.Entry<ByteBuffer, ByteBuffer> entry : attributes.entrySet()) {
+      String name = Bytes.toStringBinary(getBytes(entry.getKey()));
+      byte[] value =  getBytes(entry.getValue());
+      op.setAttribute(name, value);
+    }
+  }
+
+  protected static class ResultScannerWrapper {
+
+    private final ResultScanner scanner;
+    private final boolean sortColumns;
+    public ResultScannerWrapper(ResultScanner resultScanner,
+        boolean sortResultColumns) {
+      scanner = resultScanner;
+      sortColumns = sortResultColumns;
+    }
+
+    public ResultScanner getScanner() {
+      return scanner;
+    }
+
+    public boolean isColumnSorted() {
+      return sortColumns;
+    }
+  }
+
+  public static class IOErrorWithCause extends IOError {
+    private final Throwable cause;
+    public IOErrorWithCause(Throwable cause) {
+      this.cause = cause;
+    }
+
+    @Override
+    public synchronized Throwable getCause() {
+      return cause;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+      if (super.equals(other) &&
+          other instanceof IOErrorWithCause) {
+        Throwable otherCause = ((IOErrorWithCause) other).getCause();
+        if (this.getCause() != null) {
+          return otherCause != null && this.getCause().equals(otherCause);
+        } else {
+          return otherCause == null;
+        }
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = super.hashCode();
+      result = 31 * result + (cause != null ? cause.hashCode() : 0);
+      return result;
+    }
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/e4b6b4af/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
----------------------------------------------------------------------
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
index 4c9a35b..7f85152 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/ThriftHttpServlet.java
@@ -18,8 +18,8 @@
 
 package org.apache.hadoop.hbase.thrift;
 
-import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
-import static org.apache.hadoop.hbase.thrift.ThriftServerRunner.THRIFT_SPNEGO_PRINCIPAL_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_KEYTAB_FILE_KEY;
+import static org.apache.hadoop.hbase.thrift.Constants.THRIFT_SPNEGO_PRINCIPAL_KEY;
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
@@ -58,7 +58,7 @@ public class ThriftHttpServlet extends TServlet {
   private static final Logger LOG = LoggerFactory.getLogger(ThriftHttpServlet.class.getName());
   private final transient UserGroupInformation serviceUGI;
   private final transient UserGroupInformation httpUGI;
-  private final transient ThriftServerRunner.HBaseHandler hbaseHandler;
+  private final transient HBaseServiceHandler handler;
   private final boolean doAsEnabled;
   private final boolean securityEnabled;
 
@@ -67,11 +67,11 @@ public class ThriftHttpServlet extends TServlet {
 
   public ThriftHttpServlet(TProcessor processor, TProtocolFactory protocolFactory,
       UserGroupInformation serviceUGI, Configuration conf,
-      ThriftServerRunner.HBaseHandler hbaseHandler, boolean securityEnabled, boolean doAsEnabled)
+      HBaseServiceHandler handler, boolean securityEnabled, boolean doAsEnabled)
       throws IOException {
     super(processor, protocolFactory);
     this.serviceUGI = serviceUGI;
-    this.hbaseHandler = hbaseHandler;
+    this.handler = handler;
     this.securityEnabled = securityEnabled;
     this.doAsEnabled = doAsEnabled;
 
@@ -146,7 +146,7 @@ public class ThriftHttpServlet extends TServlet {
       }
       effectiveUser = doAsUserFromQuery;
     }
-    hbaseHandler.setEffectiveUser(effectiveUser);
+    handler.setEffectiveUser(effectiveUser);
     super.doPost(request, response);
   }