You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2012/01/17 18:49:34 UTC

svn commit: r1232506 - in /hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift: HThreadedSelectorServerArgs.java ThriftServerRunner.java

Author: tedyu
Date: Tue Jan 17 17:49:34 2012
New Revision: 1232506

URL: http://svn.apache.org/viewvc?rev=1232506&view=rev
Log:
HBASE-5201 Add new files

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java?rev=1232506&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/HThreadedSelectorServerArgs.java Tue Jan 17 17:49:34 2012
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A TThreadedSelectorServer.Args that reads hadoop configuration
+ */
+public class HThreadedSelectorServerArgs extends TThreadedSelectorServer.Args {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(TThreadedSelectorServer.class);
+
+  /**
+   * Number of selector threads for reading and writing socket
+   */
+  public static final String SELECTOR_THREADS_CONF_KEY =
+      "hbase.thrift.selector.threads";
+
+  /**
+   * Number fo threads for processing the thrift calls
+   */
+  public static final String WORKER_THREADS_CONF_KEY =
+      "hbase.thrift.worker.threads";
+
+  /**
+   * Time to wait for server to stop gracefully
+   */
+  public static final String STOP_TIMEOUT_CONF_KEY =
+      "hbase.thrift.stop.timeout.seconds";
+
+  /**
+   * Maximum number of accepted elements per selector
+   */
+  public static final String ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY =
+      "hbase.thrift.accept.queue.size.per.selector";
+
+  /**
+   * The strategy for handling new accepted connections.
+   */
+  public static final String ACCEPT_POLICY_CONF_KEY =
+      "hbase.thrift.accept.policy";
+
+  public HThreadedSelectorServerArgs(
+      TNonblockingServerTransport transport, Configuration conf) {
+    super(transport);
+    readConf(conf);
+  }
+
+  private void readConf(Configuration conf) {
+    int selectorThreads = conf.getInt(
+        SELECTOR_THREADS_CONF_KEY, getSelectorThreads());
+    int workerThreads = conf.getInt(
+        WORKER_THREADS_CONF_KEY, getWorkerThreads());
+    int stopTimeoutVal = conf.getInt(
+        STOP_TIMEOUT_CONF_KEY, getStopTimeoutVal());
+    int acceptQueueSizePerThread = conf.getInt(
+        ACCEPT_QUEUE_SIZE_PER_THREAD_CONF_KEY, getAcceptQueueSizePerThread());
+    AcceptPolicy acceptPolicy = AcceptPolicy.valueOf(conf.get(
+        ACCEPT_POLICY_CONF_KEY, getAcceptPolicy().toString()).toUpperCase());
+
+    super.selectorThreads(selectorThreads)
+         .workerThreads(workerThreads)
+         .stopTimeoutVal(stopTimeoutVal)
+         .acceptQueueSizePerThread(acceptQueueSizePerThread)
+         .acceptPolicy(acceptPolicy);
+
+    LOG.info("Read configuration selectorThreads:" + selectorThreads +
+             " workerThreads:" + workerThreads +
+             " stopTimeoutVal:" + stopTimeoutVal + "sec" +
+             " acceptQueueSizePerThread:" + acceptQueueSizePerThread +
+             " acceptPolicy:" + acceptPolicy);
+  }
+}

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java?rev=1232506&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/thrift/ThriftServerRunner.java Tue Jan 17 17:49:34 2012
@@ -0,0 +1,1237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.thrift;
+
+import static org.apache.hadoop.hbase.util.Bytes.getBytes;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.UnknownHostException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionGroup;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.ParseFilter;
+import org.apache.hadoop.hbase.filter.PrefixFilter;
+import org.apache.hadoop.hbase.filter.WhileMatchFilter;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.thrift.generated.AlreadyExists;
+import org.apache.hadoop.hbase.thrift.generated.BatchMutation;
+import org.apache.hadoop.hbase.thrift.generated.ColumnDescriptor;
+import org.apache.hadoop.hbase.thrift.generated.Hbase;
+import org.apache.hadoop.hbase.thrift.generated.IOError;
+import org.apache.hadoop.hbase.thrift.generated.IllegalArgument;
+import org.apache.hadoop.hbase.thrift.generated.Mutation;
+import org.apache.hadoop.hbase.thrift.generated.TCell;
+import org.apache.hadoop.hbase.thrift.generated.TRegionInfo;
+import org.apache.hadoop.hbase.thrift.generated.TRowResult;
+import org.apache.hadoop.hbase.thrift.generated.TScan;
+import org.apache.hadoop.hbase.util.Addressing;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.net.DNS;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.protocol.TCompactProtocol;
+import org.apache.thrift.protocol.TProtocolFactory;
+import org.apache.thrift.server.THsHaServer;
+import org.apache.thrift.server.TNonblockingServer;
+import org.apache.thrift.server.TServer;
+import org.apache.thrift.server.TThreadedSelectorServer;
+import org.apache.thrift.transport.TFramedTransport;
+import org.apache.thrift.transport.TNonblockingServerSocket;
+import org.apache.thrift.transport.TNonblockingServerTransport;
+import org.apache.thrift.transport.TServerSocket;
+import org.apache.thrift.transport.TServerTransport;
+import org.apache.thrift.transport.TTransportFactory;
+
+import com.google.common.base.Joiner;
+
+/**
+ * ThriftServerRunner - this class starts up a Thrift server which implements
+ * the Hbase API specified in the Hbase.thrift IDL file.
+ */
+public class ThriftServerRunner implements Runnable {
+
+  private static final Log LOG = LogFactory.getLog(ThriftServerRunner.class);
+
+  static final String SERVER_TYPE_CONF_KEY =
+      "hbase.regionserver.thrift.server.type";
+
+  static final String BIND_CONF_KEY = "hbase.regionserver.thrift.ipaddress";
+  static final String COMPACT_CONF_KEY = "hbase.regionserver.thrift.compact";
+  static final String FRAMED_CONF_KEY = "hbase.regionserver.thrift.framed";
+  static final String PORT_CONF_KEY = "hbase.regionserver.thrift.port";
+
+  private static final String DEFAULT_BIND_ADDR = "0.0.0.0";
+  private static final int DEFAULT_LISTEN_PORT = 9090;
+
+  private Configuration conf;
+  volatile TServer tserver;
+  private final HBaseHandler handler;
+
+  /** An enum of server implementation selections */
+  enum ImplType {
+    HS_HA("hsha", true, THsHaServer.class, false),
+    NONBLOCKING("nonblocking", true, TNonblockingServer.class, false),
+    THREAD_POOL("threadpool", false, TBoundedThreadPoolServer.class, true),
+    THREADED_SELECTOR(
+        "threadedselector", true, TThreadedSelectorServer.class, false);
+
+    public static final ImplType DEFAULT = THREAD_POOL;
+
+    final String option;
+    final boolean isAlwaysFramed;
+    final Class<? extends TServer> serverClass;
+    final boolean canSpecifyBindIP;
+
+    ImplType(String option, boolean isAlwaysFramed,
+        Class<? extends TServer> serverClass, boolean canSpecifyBindIP) {
+      this.option = option;
+      this.isAlwaysFramed = isAlwaysFramed;
+      this.serverClass = serverClass;
+      this.canSpecifyBindIP = canSpecifyBindIP;
+    }
+
+    /**
+     * @return <code>-option</code> so we can get the list of options from
+     *         {@link #values()}
+     */
+    @Override
+    public String toString() {
+      return "-" + option;
+    }
+
+    String getDescription() {
+      StringBuilder sb = new StringBuilder("Use the " +
+          serverClass.getSimpleName());
+      if (isAlwaysFramed) {
+        sb.append(" This implies the framed transport.");
+      }
+      if (this == DEFAULT) {
+        sb.append("This is the default.");
+      }
+      return sb.toString();
+    }
+
+    static OptionGroup createOptionGroup() {
+      OptionGroup group = new OptionGroup();
+      for (ImplType t : values()) {
+        group.addOption(new Option(t.option, t.getDescription()));
+      }
+      return group;
+    }
+
+    static ImplType getServerImpl(Configuration conf) {
+      String confType = conf.get(SERVER_TYPE_CONF_KEY, THREAD_POOL.option);
+      for (ImplType t : values()) {
+        if (confType.equals(t.option)) {
+          return t;
+        }
+      }
+      throw new AssertionError("Unknown server ImplType.option:" + confType);
+    }
+
+    static void setServerImpl(CommandLine cmd, Configuration conf) {
+      ImplType chosenType = null;
+      int numChosen = 0;
+      for (ImplType t : values()) {
+        if (cmd.hasOption(t.option)) {
+          chosenType = t;
+          ++numChosen;
+        }
+      }
+      if (numChosen != 1) {
+        throw new AssertionError("Exactly one option out of " +
+            Arrays.toString(values()) + " has to be specified");
+      }
+      LOG.info("Setting thrift server to " + chosenType.option);
+      conf.set(SERVER_TYPE_CONF_KEY, chosenType.option);
+    }
+
+    public String simpleClassName() {
+      return serverClass.getSimpleName();
+    }
+
+    public static List<String> serversThatCannotSpecifyBindIP() {
+      List<String> l = new ArrayList<String>();
+      for (ImplType t : values()) {
+        if (!t.canSpecifyBindIP) {
+          l.add(t.simpleClassName());
+        }
+      }
+      return l;
+    }
+
+  }
+
+  public ThriftServerRunner(Configuration conf) throws IOException {
+    this(conf, new ThriftServerRunner.HBaseHandler(conf));
+  }
+
+  public ThriftServerRunner(Configuration conf, HBaseHandler handler) {
+    this.conf = HBaseConfiguration.create(conf);
+    this.handler = handler;
+  }
+
+  /*
+   * Runs the Thrift server
+   */
+  @Override
+  public void run() {
+    try {
+      setupServer();
+      tserver.serve();
+    } catch (Exception e) {
+      LOG.fatal("Cannot run ThriftServer");
+      // Crash the process if the ThriftServer is not running
+      System.exit(-1);
+    }
+  }
+
+  public void shutdown() {
+    if (tserver != null) {
+      tserver.stop();
+      tserver = null;
+    }
+  }
+
+  /**
+   * Setting up the thrift TServer
+   */
+  private void setupServer() throws Exception {
+    // Get port to bind to
+    int listenPort = conf.getInt(PORT_CONF_KEY, DEFAULT_LISTEN_PORT);
+
+    // Construct correct ProtocolFactory
+    TProtocolFactory protocolFactory;
+    if (conf.getBoolean(COMPACT_CONF_KEY, false)) {
+      LOG.debug("Using compact protocol");
+      protocolFactory = new TCompactProtocol.Factory();
+    } else {
+      LOG.debug("Using binary protocol");
+      protocolFactory = new TBinaryProtocol.Factory();
+    }
+
+    Hbase.Processor<Hbase.Iface> processor =
+        new Hbase.Processor<Hbase.Iface>(handler);
+    ImplType implType = ImplType.getServerImpl(conf);
+
+    // Construct correct TransportFactory
+    TTransportFactory transportFactory;
+    if (conf.getBoolean(FRAMED_CONF_KEY, false) || implType.isAlwaysFramed) {
+      transportFactory = new TFramedTransport.Factory();
+      LOG.debug("Using framed transport");
+    } else {
+      transportFactory = new TTransportFactory();
+    }
+
+    if (conf.get(BIND_CONF_KEY) != null && !implType.canSpecifyBindIP) {
+      LOG.error("Server types " + Joiner.on(", ").join(
+          ImplType.serversThatCannotSpecifyBindIP()) + " don't support IP " +
+          "address binding at the moment. See " +
+          "https://issues.apache.org/jira/browse/HBASE-2155 for details.");
+      throw new RuntimeException(
+          "-" + BIND_CONF_KEY + " not supported with " + implType);
+    }
+
+    if (implType == ImplType.HS_HA || implType == ImplType.NONBLOCKING ||
+        implType == ImplType.THREADED_SELECTOR) {
+
+      TNonblockingServerTransport serverTransport =
+          new TNonblockingServerSocket(listenPort);
+
+      if (implType == ImplType.NONBLOCKING) {
+        TNonblockingServer.Args serverArgs =
+            new TNonblockingServer.Args(serverTransport);
+        serverArgs.processor(processor)
+                  .transportFactory(transportFactory)
+                  .protocolFactory(protocolFactory);
+        tserver = new TNonblockingServer(serverArgs);
+      } else if (implType == ImplType.HS_HA) {
+        THsHaServer.Args serverArgs = new THsHaServer.Args(serverTransport);
+        serverArgs.processor(processor)
+                  .transportFactory(transportFactory)
+                  .protocolFactory(protocolFactory);
+        tserver = new THsHaServer(serverArgs);
+      } else { // THREADED_SELECTOR
+        TThreadedSelectorServer.Args serverArgs =
+            new HThreadedSelectorServerArgs(serverTransport, conf);
+        serverArgs.processor(processor)
+                  .transportFactory(transportFactory)
+                  .protocolFactory(protocolFactory);
+        tserver = new TThreadedSelectorServer(serverArgs);
+      }
+      LOG.info("starting HBase " + implType.simpleClassName() +
+          " server on " + Integer.toString(listenPort));
+    } else if (implType == ImplType.THREAD_POOL) {
+      // Thread pool server. Get the IP address to bind to.
+      InetAddress listenAddress = getBindAddress(conf);
+
+      TServerTransport serverTransport = new TServerSocket(
+          new InetSocketAddress(listenAddress, listenPort));
+
+      TBoundedThreadPoolServer.Args serverArgs =
+          new TBoundedThreadPoolServer.Args(serverTransport, conf);
+      serverArgs.processor(processor)
+                .transportFactory(transportFactory)
+                .protocolFactory(protocolFactory);
+      LOG.info("starting " + ImplType.THREAD_POOL.simpleClassName() + " on "
+          + listenAddress + ":" + Integer.toString(listenPort)
+          + "; " + serverArgs);
+      tserver = new TBoundedThreadPoolServer(serverArgs);
+    } else {
+      throw new AssertionError("Unsupported Thrift server implementation: " +
+          implType.simpleClassName());
+    }
+
+    // A sanity check that we instantiated the right type of server.
+    if (tserver.getClass() != implType.serverClass) {
+      throw new AssertionError("Expected to create Thrift server class " +
+          implType.serverClass.getName() + " but got " +
+          tserver.getClass().getName());
+    }
+
+    // login the server principal (if using secure Hadoop)   
+    Configuration conf = handler.conf;
+    if (User.isSecurityEnabled() && User.isHBaseSecurityEnabled(conf)) {
+      String machineName = Strings.domainNamePointerToHostName(
+        DNS.getDefaultHost(conf.get("hbase.thrift.dns.interface", "default"),
+          conf.get("hbase.thrift.dns.nameserver", "default")));
+      User.login(conf, "hbase.thrift.keytab.file",
+          "hbase.thrift.kerberos.principal", machineName);
+    }
+  }
+
+  private InetAddress getBindAddress(Configuration conf)
+      throws UnknownHostException {
+    String bindAddressStr = conf.get(BIND_CONF_KEY, DEFAULT_BIND_ADDR);
+    return InetAddress.getByName(bindAddressStr);
+  }
+
+  /**
+   * The HBaseHandler is a glue object that connects Thrift RPC calls to the
+   * HBase client API primarily defined in the HBaseAdmin and HTable objects.
+   */
+  public static class HBaseHandler implements Hbase.Iface {
+    protected Configuration conf;
+    protected HBaseAdmin admin = null;
+    protected final Log LOG = LogFactory.getLog(this.getClass().getName());
+
+    // nextScannerId and scannerMap are used to manage scanner state
+    protected int nextScannerId = 0;
+    protected HashMap<Integer, ResultScanner> scannerMap = null;
+
+    private static ThreadLocal<Map<String, HTable>> threadLocalTables =
+        new ThreadLocal<Map<String, HTable>>() {
+      @Override
+      protected Map<String, HTable> initialValue() {
+        return new TreeMap<String, HTable>();
+      }
+    };
+
+    /**
+     * Returns a list of all the column families for a given htable.
+     *
+     * @param table
+     * @return
+     * @throws IOException
+     */
+    byte[][] getAllColumns(HTable table) throws IOException {
+      HColumnDescriptor[] cds = table.getTableDescriptor().getColumnFamilies();
+      byte[][] columns = new byte[cds.length][];
+      for (int i = 0; i < cds.length; i++) {
+        columns[i] = Bytes.add(cds[i].getName(),
+            KeyValue.COLUMN_FAMILY_DELIM_ARRAY);
+      }
+      return columns;
+    }
+
+    /**
+     * Creates and returns an HTable instance from a given table name.
+     *
+     * @param tableName
+     *          name of table
+     * @return HTable object
+     * @throws IOException
+     * @throws IOError
+     */
+    protected HTable getTable(final byte[] tableName) throws
+        IOException {
+      String table = new String(tableName);
+      Map<String, HTable> tables = threadLocalTables.get();
+      if (!tables.containsKey(table)) {
+        tables.put(table, new HTable(conf, tableName));
+      }
+      return tables.get(table);
+    }
+
+    protected HTable getTable(final ByteBuffer tableName) throws IOException {
+      return getTable(getBytes(tableName));
+    }
+
+    /**
+     * Assigns a unique ID to the scanner and adds the mapping to an internal
+     * hash-map.
+     *
+     * @param scanner
+     * @return integer scanner id
+     */
+    protected synchronized int addScanner(ResultScanner scanner) {
+      int id = nextScannerId++;
+      scannerMap.put(id, scanner);
+      return id;
+    }
+
+    /**
+     * Returns the scanner associated with the specified ID.
+     *
+     * @param id
+     * @return a Scanner, or null if ID was invalid.
+     */
+    protected synchronized ResultScanner getScanner(int id) {
+      return scannerMap.get(id);
+    }
+
+    /**
+     * Removes the scanner associated with the specified ID from the internal
+     * id->scanner hash-map.
+     *
+     * @param id
+     * @return a Scanner, or null if ID was invalid.
+     */
+    protected synchronized ResultScanner removeScanner(int id) {
+      return scannerMap.remove(id);
+    }
+
+    /**
+     * Constructs an HBaseHandler object.
+     * @throws IOException
+     */
+    protected HBaseHandler()
+    throws IOException {
+      this(HBaseConfiguration.create());
+    }
+
+    protected HBaseHandler(final Configuration c)
+    throws IOException {
+      this.conf = c;
+      admin = new HBaseAdmin(conf);
+      scannerMap = new HashMap<Integer, ResultScanner>();
+    }
+
+    @Override
+    public void enableTable(ByteBuffer tableName) throws IOError {
+      try{
+        admin.enableTable(getBytes(tableName));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void disableTable(ByteBuffer tableName) throws IOError{
+      try{
+        admin.disableTable(getBytes(tableName));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public boolean isTableEnabled(ByteBuffer tableName) throws IOError {
+      try {
+        return HTable.isTableEnabled(this.conf, getBytes(tableName));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void compact(ByteBuffer tableNameOrRegionName) throws IOError {
+      try{
+        admin.compact(getBytes(tableNameOrRegionName));
+      } catch (InterruptedException e) {
+        throw new IOError(e.getMessage());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void majorCompact(ByteBuffer tableNameOrRegionName) throws IOError {
+      try{
+        admin.majorCompact(getBytes(tableNameOrRegionName));
+      } catch (InterruptedException e) {
+        throw new IOError(e.getMessage());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public List<ByteBuffer> getTableNames() throws IOError {
+      try {
+        HTableDescriptor[] tables = this.admin.listTables();
+        ArrayList<ByteBuffer> list = new ArrayList<ByteBuffer>(tables.length);
+        for (int i = 0; i < tables.length; i++) {
+          list.add(ByteBuffer.wrap(tables[i].getName()));
+        }
+        return list;
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public List<TRegionInfo> getTableRegions(ByteBuffer tableName)
+    throws IOError {
+      try{
+        List<HRegionInfo> hris = this.admin.getTableRegions(tableName.array());
+        List<TRegionInfo> regions = new ArrayList<TRegionInfo>();
+
+        if (hris != null) {
+          for (HRegionInfo regionInfo : hris){
+            TRegionInfo region = new TRegionInfo();
+            region.startKey = ByteBuffer.wrap(regionInfo.getStartKey());
+            region.endKey = ByteBuffer.wrap(regionInfo.getEndKey());
+            region.id = regionInfo.getRegionId();
+            region.name = ByteBuffer.wrap(regionInfo.getRegionName());
+            region.version = regionInfo.getVersion();
+            regions.add(region);
+          }
+        }
+        return regions;
+      } catch (IOException e){
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public List<TCell> get(
+        ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+        throws IOError {
+      byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+      if(famAndQf.length == 1) {
+        return get(tableName, row, famAndQf[0], new byte[0]);
+      }
+      return get(tableName, row, famAndQf[0], famAndQf[1]);
+    }
+
+    protected List<TCell> get(ByteBuffer tableName,
+                              ByteBuffer row,
+                              byte[] family,
+                              byte[] qualifier) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Get get = new Get(getBytes(row));
+        if (qualifier == null || qualifier.length == 0) {
+          get.addFamily(family);
+        } else {
+          get.addColumn(family, qualifier);
+        }
+        Result result = table.get(get);
+        return ThriftUtilities.cellFromHBase(result.raw());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
+        ByteBuffer column, int numVersions) throws IOError {
+      byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+      if(famAndQf.length == 1) {
+        return getVer(tableName, row, famAndQf[0],
+            new byte[0], numVersions);
+      }
+      return getVer(tableName, row,
+          famAndQf[0], famAndQf[1], numVersions);
+    }
+
+    public List<TCell> getVer(ByteBuffer tableName, ByteBuffer row,
+                              byte[] family,
+        byte[] qualifier, int numVersions) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Get get = new Get(getBytes(row));
+        get.addColumn(family, qualifier);
+        get.setMaxVersions(numVersions);
+        Result result = table.get(get);
+        return ThriftUtilities.cellFromHBase(result.raw());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public List<TCell> getVerTs(ByteBuffer tableName,
+                                   ByteBuffer row,
+        ByteBuffer column,
+        long timestamp,
+        int numVersions) throws IOError {
+      byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+      if(famAndQf.length == 1) {
+        return getVerTs(tableName, row, famAndQf[0], new byte[0], timestamp,
+            numVersions);
+      }
+      return getVerTs(tableName, row, famAndQf[0], famAndQf[1], timestamp,
+          numVersions);
+    }
+
+    protected List<TCell> getVerTs(ByteBuffer tableName,
+                                   ByteBuffer row, byte [] family,
+        byte [] qualifier, long timestamp, int numVersions) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Get get = new Get(getBytes(row));
+        get.addColumn(family, qualifier);
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        get.setMaxVersions(numVersions);
+        Result result = table.get(get);
+        return ThriftUtilities.cellFromHBase(result.raw());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public List<TRowResult> getRow(ByteBuffer tableName, ByteBuffer row)
+        throws IOError {
+      return getRowWithColumnsTs(tableName, row, null,
+                                 HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public List<TRowResult> getRowWithColumns(ByteBuffer tableName,
+                                              ByteBuffer row,
+        List<ByteBuffer> columns) throws IOError {
+      return getRowWithColumnsTs(tableName, row, columns,
+                                 HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public List<TRowResult> getRowTs(ByteBuffer tableName, ByteBuffer row,
+        long timestamp) throws IOError {
+      return getRowWithColumnsTs(tableName, row, null,
+                                 timestamp);
+    }
+
+    @Override
+    public List<TRowResult> getRowWithColumnsTs(
+        ByteBuffer tableName, ByteBuffer row, List<ByteBuffer> columns,
+        long timestamp) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        if (columns == null) {
+          Get get = new Get(getBytes(row));
+          get.setTimeRange(Long.MIN_VALUE, timestamp);
+          Result result = table.get(get);
+          return ThriftUtilities.rowResultFromHBase(result);
+        }
+        Get get = new Get(getBytes(row));
+        for(ByteBuffer column : columns) {
+          byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+          if (famAndQf.length == 1) {
+              get.addFamily(famAndQf[0]);
+          } else {
+              get.addColumn(famAndQf[0], famAndQf[1]);
+          }
+        }
+        get.setTimeRange(Long.MIN_VALUE, timestamp);
+        Result result = table.get(get);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public List<TRowResult> getRows(ByteBuffer tableName,
+                                    List<ByteBuffer> rows)
+        throws IOError {
+      return getRowsWithColumnsTs(tableName, rows, null,
+                                  HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public List<TRowResult> getRowsWithColumns(ByteBuffer tableName,
+                                               List<ByteBuffer> rows,
+        List<ByteBuffer> columns) throws IOError {
+      return getRowsWithColumnsTs(tableName, rows, columns,
+                                  HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public List<TRowResult> getRowsTs(ByteBuffer tableName,
+                                      List<ByteBuffer> rows,
+        long timestamp) throws IOError {
+      return getRowsWithColumnsTs(tableName, rows, null,
+                                  timestamp);
+    }
+
+    @Override
+    public List<TRowResult> getRowsWithColumnsTs(ByteBuffer tableName,
+                                                 List<ByteBuffer> rows,
+        List<ByteBuffer> columns, long timestamp) throws IOError {
+      try {
+        List<Get> gets = new ArrayList<Get>(rows.size());
+        HTable table = getTable(tableName);
+        for (ByteBuffer row : rows) {
+          Get get = new Get(getBytes(row));
+          if (columns != null) {
+
+            for(ByteBuffer column : columns) {
+              byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+              if (famAndQf.length == 1) {
+                get.addFamily(famAndQf[0]);
+              } else {
+                get.addColumn(famAndQf[0], famAndQf[1]);
+              }
+            }
+            get.setTimeRange(Long.MIN_VALUE, timestamp);
+          }
+          gets.add(get);
+        }
+        Result[] result = table.get(gets);
+        return ThriftUtilities.rowResultFromHBase(result);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void deleteAll(
+        ByteBuffer tableName, ByteBuffer row, ByteBuffer column)
+        throws IOError {
+      deleteAllTs(tableName, row, column, HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public void deleteAllTs(ByteBuffer tableName,
+                            ByteBuffer row,
+                            ByteBuffer column,
+        long timestamp) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Delete delete  = new Delete(getBytes(row));
+        byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+        if (famAndQf.length == 1) {
+          delete.deleteFamily(famAndQf[0], timestamp);
+        } else {
+          delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+        }
+        table.delete(delete);
+
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void deleteAllRow(
+        ByteBuffer tableName, ByteBuffer row) throws IOError {
+      deleteAllRowTs(tableName, row, HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public void deleteAllRowTs(
+        ByteBuffer tableName, ByteBuffer row, long timestamp) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Delete delete  = new Delete(getBytes(row), timestamp, null);
+        table.delete(delete);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void createTable(ByteBuffer in_tableName,
+        List<ColumnDescriptor> columnFamilies) throws IOError,
+        IllegalArgument, AlreadyExists {
+      byte [] tableName = getBytes(in_tableName);
+      try {
+        if (admin.tableExists(tableName)) {
+          throw new AlreadyExists("table name already in use");
+        }
+        HTableDescriptor desc = new HTableDescriptor(tableName);
+        for (ColumnDescriptor col : columnFamilies) {
+          HColumnDescriptor colDesc = ThriftUtilities.colDescFromThrift(col);
+          desc.addFamily(colDesc);
+        }
+        admin.createTable(desc);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgument(e.getMessage());
+      }
+    }
+
+    @Override
+    public void deleteTable(ByteBuffer in_tableName) throws IOError {
+      byte [] tableName = getBytes(in_tableName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("deleteTable: table=" + Bytes.toString(tableName));
+      }
+      try {
+        if (!admin.tableExists(tableName)) {
+          throw new IOError("table does not exist");
+        }
+        admin.deleteTable(tableName);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public void mutateRow(ByteBuffer tableName, ByteBuffer row,
+        List<Mutation> mutations) throws IOError, IllegalArgument {
+      mutateRowTs(tableName, row, mutations, HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public void mutateRowTs(ByteBuffer tableName, ByteBuffer row,
+        List<Mutation> mutations, long timestamp)
+        throws IOError, IllegalArgument {
+      HTable table = null;
+      try {
+        table = getTable(tableName);
+        Put put = new Put(getBytes(row), timestamp, null);
+
+        Delete delete = new Delete(getBytes(row));
+
+        // I apologize for all this mess :)
+        for (Mutation m : mutations) {
+          byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+          if (m.isDelete) {
+            if (famAndQf.length == 1) {
+              delete.deleteFamily(famAndQf[0], timestamp);
+            } else {
+              delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+            }
+          } else {
+            if(famAndQf.length == 1) {
+              put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
+                  m.value != null ? m.value.array()
+                      : HConstants.EMPTY_BYTE_ARRAY);
+            } else {
+              put.add(famAndQf[0], famAndQf[1],
+                  m.value != null ? m.value.array()
+                      : HConstants.EMPTY_BYTE_ARRAY);
+            }
+          }
+        }
+        if (!delete.isEmpty())
+          table.delete(delete);
+        if (!put.isEmpty())
+          table.put(put);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgument(e.getMessage());
+      }
+    }
+
+    @Override
+    public void mutateRows(ByteBuffer tableName, List<BatchMutation> rowBatches)
+        throws IOError, IllegalArgument, TException {
+      mutateRowsTs(tableName, rowBatches, HConstants.LATEST_TIMESTAMP);
+    }
+
+    @Override
+    public void mutateRowsTs(
+        ByteBuffer tableName, List<BatchMutation> rowBatches, long timestamp)
+        throws IOError, IllegalArgument, TException {
+      List<Put> puts = new ArrayList<Put>();
+      List<Delete> deletes = new ArrayList<Delete>();
+
+      for (BatchMutation batch : rowBatches) {
+        byte[] row = getBytes(batch.row);
+        List<Mutation> mutations = batch.mutations;
+        Delete delete = new Delete(row);
+        Put put = new Put(row, timestamp, null);
+        for (Mutation m : mutations) {
+          byte[][] famAndQf = KeyValue.parseColumn(getBytes(m.column));
+          if (m.isDelete) {
+            // no qualifier, family only.
+            if (famAndQf.length == 1) {
+              delete.deleteFamily(famAndQf[0], timestamp);
+            } else {
+              delete.deleteColumns(famAndQf[0], famAndQf[1], timestamp);
+            }
+          } else {
+            if(famAndQf.length == 1) {
+              put.add(famAndQf[0], HConstants.EMPTY_BYTE_ARRAY,
+                  m.value != null ? m.value.array()
+                      : HConstants.EMPTY_BYTE_ARRAY);
+            } else {
+              put.add(famAndQf[0], famAndQf[1],
+                  m.value != null ? m.value.array()
+                      : HConstants.EMPTY_BYTE_ARRAY);
+            }
+          }
+        }
+        if (!delete.isEmpty())
+          deletes.add(delete);
+        if (!put.isEmpty())
+          puts.add(put);
+      }
+
+      HTable table = null;
+      try {
+        table = getTable(tableName);
+        if (!puts.isEmpty())
+          table.put(puts);
+        for (Delete del : deletes) {
+          table.delete(del);
+        }
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgument(e.getMessage());
+      }
+    }
+
+    @Deprecated
+    @Override
+    public long atomicIncrement(
+        ByteBuffer tableName, ByteBuffer row, ByteBuffer column, long amount)
+            throws IOError, IllegalArgument, TException {
+      byte [][] famAndQf = KeyValue.parseColumn(getBytes(column));
+      if(famAndQf.length == 1) {
+        return atomicIncrement(tableName, row, famAndQf[0], new byte[0],
+            amount);
+      }
+      return atomicIncrement(tableName, row, famAndQf[0], famAndQf[1], amount);
+    }
+
+    protected long atomicIncrement(ByteBuffer tableName, ByteBuffer row,
+        byte [] family, byte [] qualifier, long amount)
+        throws IOError, IllegalArgument, TException {
+      HTable table;
+      try {
+        table = getTable(tableName);
+        return table.incrementColumnValue(
+            getBytes(row), family, qualifier, amount);
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    public void scannerClose(int id) throws IOError, IllegalArgument {
+      LOG.debug("scannerClose: id=" + id);
+      ResultScanner scanner = getScanner(id);
+      if (scanner == null) {
+        throw new IllegalArgument("scanner ID is invalid");
+      }
+      scanner.close();
+      removeScanner(id);
+    }
+
+    @Override
+    public List<TRowResult> scannerGetList(int id,int nbRows)
+        throws IllegalArgument, IOError {
+      LOG.debug("scannerGetList: id=" + id);
+      ResultScanner scanner = getScanner(id);
+      if (null == scanner) {
+        throw new IllegalArgument("scanner ID is invalid");
+      }
+
+      Result [] results = null;
+      try {
+        results = scanner.next(nbRows);
+        if (null == results) {
+          return new ArrayList<TRowResult>();
+        }
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+      return ThriftUtilities.rowResultFromHBase(results);
+    }
+
+    @Override
+    public List<TRowResult> scannerGet(int id) throws IllegalArgument, IOError {
+      return scannerGetList(id,1);
+    }
+
+    public int scannerOpenWithScan(ByteBuffer tableName, TScan tScan)
+        throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan();
+        if (tScan.isSetStartRow()) {
+          scan.setStartRow(tScan.getStartRow());
+        }
+        if (tScan.isSetStopRow()) {
+          scan.setStopRow(tScan.getStopRow());
+        }
+        if (tScan.isSetTimestamp()) {
+          scan.setTimeRange(Long.MIN_VALUE, tScan.getTimestamp());
+        }
+        if (tScan.isSetCaching()) {
+          scan.setCaching(tScan.getCaching());
+        }
+        if (tScan.isSetColumns() && tScan.getColumns().size() != 0) {
+          for(ByteBuffer column : tScan.getColumns()) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        if (tScan.isSetFilterString()) {
+          ParseFilter parseFilter = new ParseFilter();
+          scan.setFilter(
+              parseFilter.parseFilterString(tScan.getFilterString()));
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public int scannerOpen(ByteBuffer tableName, ByteBuffer startRow,
+        List<ByteBuffer> columns) throws IOError {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startRow));
+        if(columns != null && columns.size() != 0) {
+          for(ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public int scannerOpenWithStop(ByteBuffer tableName, ByteBuffer startRow,
+        ByteBuffer stopRow, List<ByteBuffer> columns)
+        throws IOError, TException {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+        if(columns != null && columns.size() != 0) {
+          for(ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public int scannerOpenWithPrefix(ByteBuffer tableName,
+                                     ByteBuffer startAndPrefix,
+                                     List<ByteBuffer> columns)
+        throws IOError, TException {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startAndPrefix));
+        Filter f = new WhileMatchFilter(
+            new PrefixFilter(getBytes(startAndPrefix)));
+        scan.setFilter(f);
+        if (columns != null && columns.size() != 0) {
+          for(ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public int scannerOpenTs(ByteBuffer tableName, ByteBuffer startRow,
+        List<ByteBuffer> columns, long timestamp) throws IOError, TException {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startRow));
+        scan.setTimeRange(Long.MIN_VALUE, timestamp);
+        if (columns != null && columns.size() != 0) {
+          for (ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public int scannerOpenWithStopTs(ByteBuffer tableName, ByteBuffer startRow,
+        ByteBuffer stopRow, List<ByteBuffer> columns, long timestamp)
+        throws IOError, TException {
+      try {
+        HTable table = getTable(tableName);
+        Scan scan = new Scan(getBytes(startRow), getBytes(stopRow));
+        scan.setTimeRange(Long.MIN_VALUE, timestamp);
+        if (columns != null && columns.size() != 0) {
+          for (ByteBuffer column : columns) {
+            byte [][] famQf = KeyValue.parseColumn(getBytes(column));
+            if(famQf.length == 1) {
+              scan.addFamily(famQf[0]);
+            } else {
+              scan.addColumn(famQf[0], famQf[1]);
+            }
+          }
+        }
+        scan.setTimeRange(Long.MIN_VALUE, timestamp);
+        return addScanner(table.getScanner(scan));
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public Map<ByteBuffer, ColumnDescriptor> getColumnDescriptors(
+        ByteBuffer tableName) throws IOError, TException {
+      try {
+        TreeMap<ByteBuffer, ColumnDescriptor> columns =
+          new TreeMap<ByteBuffer, ColumnDescriptor>();
+
+        HTable table = getTable(tableName);
+        HTableDescriptor desc = table.getTableDescriptor();
+
+        for (HColumnDescriptor e : desc.getFamilies()) {
+          ColumnDescriptor col = ThriftUtilities.colDescFromHbase(e);
+          columns.put(col.name, col);
+        }
+        return columns;
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public List<TCell> getRowOrBefore(ByteBuffer tableName, ByteBuffer row,
+        ByteBuffer family) throws IOError {
+      try {
+        HTable table = getTable(getBytes(tableName));
+        Result result = table.getRowOrBefore(getBytes(row), getBytes(family));
+        return ThriftUtilities.cellFromHBase(result.raw());
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+
+    @Override
+    public TRegionInfo getRegionInfo(ByteBuffer searchRow) throws IOError {
+      try {
+        HTable table = getTable(HConstants.META_TABLE_NAME);
+        Result startRowResult = table.getRowOrBefore(
+          searchRow.array(), HConstants.CATALOG_FAMILY);
+
+        if (startRowResult == null) {
+          throw new IOException("Cannot find row in .META., row="
+                                + Bytes.toString(searchRow.array()));
+        }
+
+        // find region start and end keys
+        byte[] value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+                                               HConstants.REGIONINFO_QUALIFIER);
+        if (value == null || value.length == 0) {
+          throw new IOException("HRegionInfo REGIONINFO was null or " +
+                                " empty in Meta for row="
+                                + Bytes.toString(searchRow.array()));
+        }
+        HRegionInfo regionInfo = Writables.getHRegionInfo(value);
+        TRegionInfo region = new TRegionInfo();
+        region.setStartKey(regionInfo.getStartKey());
+        region.setEndKey(regionInfo.getEndKey());
+        region.id = regionInfo.getRegionId();
+        region.setName(regionInfo.getRegionName());
+        region.version = regionInfo.getVersion();
+
+        // find region assignment to server
+        value = startRowResult.getValue(HConstants.CATALOG_FAMILY,
+                                        HConstants.SERVER_QUALIFIER);
+        if (value != null && value.length > 0) {
+          String hostAndPort = Bytes.toString(value);
+          region.setServerName(Bytes.toBytes(
+              Addressing.parseHostname(hostAndPort)));
+          region.port = Addressing.parsePort(hostAndPort);
+        }
+        return region;
+      } catch (IOException e) {
+        throw new IOError(e.getMessage());
+      }
+    }
+  }
+}