You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ec...@apache.org on 2013/02/25 23:50:29 UTC

svn commit: r1449950 [10/35] - in /hbase/trunk: ./ hbase-client/ hbase-client/src/ hbase-client/src/main/ hbase-client/src/main/java/ hbase-client/src/main/java/org/ hbase-client/src/main/java/org/apache/ hbase-client/src/main/java/org/apache/hadoop/ h...

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,1397 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnectionManager.HConnectable;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.filter.BinaryComparator;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+import org.apache.hadoop.hbase.ipc.RegionCoprocessorRpcChannel;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.RequestConverter;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiGetResponse;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateResponse;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.CompareType;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * <p>Used to communicate with a single HBase table.
+ *
+ * <p>This class is not thread safe for reads nor write.
+ *
+ * <p>In case of writes (Put, Delete), the underlying write buffer can
+ * be corrupted if multiple threads contend over a single HTable instance.
+ *
+ * <p>In case of reads, some fields used by a Scan are shared among all threads.
+ * The HTable implementation can either not contract to be safe in case of a Get
+ *
+ * <p>To access a table in a multi threaded environment, please consider
+ * using the {@link HTablePool} class to create your HTable instances.
+ *
+ * <p>Instances of HTable passed the same {@link Configuration} instance will
+ * share connections to servers out on the cluster and to the zookeeper ensemble
+ * as well as caches of region locations.  This is usually a *good* thing and it
+ * is recommended to reuse the same configuration object for all your tables.
+ * This happens because they will all share the same underlying
+ * {@link HConnection} instance. See {@link HConnectionManager} for more on
+ * how this mechanism works.
+ *
+ * <p>{@link HConnection} will read most of the
+ * configuration it needs from the passed {@link Configuration} on initial
+ * construction.  Thereafter, for settings such as
+ * <code>hbase.client.pause</code>, <code>hbase.client.retries.number</code>,
+ * and <code>hbase.client.rpc.maxattempts</code> updating their values in the
+ * passed {@link Configuration} subsequent to {@link HConnection} construction
+ * will go unnoticed.  To run with changed values, make a new
+ * {@link HTable} passing a new {@link Configuration} instance that has the
+ * new configuration.
+ *
+ * <p>Note that this class implements the {@link Closeable} interface. When a
+ * HTable instance is no longer required, it *should* be closed in order to ensure
+ * that the underlying resources are promptly released. Please note that the close
+ * method can throw java.io.IOException that must be handled.
+ *
+ * @see HBaseAdmin for create, drop, list, enable and disable of tables.
+ * @see HConnection
+ * @see HConnectionManager
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HTable implements HTableInterface {
+  private static final Log LOG = LogFactory.getLog(HTable.class);
+  private HConnection connection;
+  private final byte [] tableName;
+  private volatile Configuration configuration;
+  private final ArrayList<Put> writeBuffer = new ArrayList<Put>();
+  private long writeBufferSize;
+  private boolean clearBufferOnFail;
+  private boolean autoFlush;
+  private long currentWriteBufferSize;
+  protected int scannerCaching;
+  private int maxKeyValueSize;
+  private ExecutorService pool;  // For Multi
+  private boolean closed;
+  private int operationTimeout;
+  private static final int DOPUT_WB_CHECK = 10;    // i.e., doPut checks the writebuffer every X Puts.
+  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
+  private final boolean cleanupConnectionOnClose; // close the connection in close()
+
+  /**
+   * Creates an object to access a HBase table.
+   * Shares zookeeper connection and other resources with other HTable instances
+   * created with the same <code>conf</code> instance.  Uses already-populated
+   * region cache if one is available, populated by any other HTable instances
+   * sharing this <code>conf</code> instance.  Recommended.
+   * @param conf Configuration object to use.
+   * @param tableName Name of the table.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HTable(Configuration conf, final String tableName)
+  throws IOException {
+    this(conf, Bytes.toBytes(tableName));
+  }
+
+
+  /**
+   * Creates an object to access a HBase table.
+   * Shares zookeeper connection and other resources with other HTable instances
+   * created with the same <code>conf</code> instance.  Uses already-populated
+   * region cache if one is available, populated by any other HTable instances
+   * sharing this <code>conf</code> instance.  Recommended.
+   * @param conf Configuration object to use.
+   * @param tableName Name of the table.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HTable(Configuration conf, final byte [] tableName)
+  throws IOException {
+    this.tableName = tableName;
+    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = true;
+    if (conf == null) {
+      this.connection = null;
+      return;
+    }
+    this.connection = HConnectionManager.getConnection(conf);
+    this.configuration = conf;
+
+    int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE);
+    if (maxThreads == 0) {
+      maxThreads = 1; // is there a better default?
+    }
+    long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60);
+
+    // Using the "direct handoff" approach, new threads will only be created
+    // if it is necessary and will grow unbounded. This could be bad but in HCM
+    // we only create as many Runnables as there are region servers. It means
+    // it also scales when new region servers are added.
+    this.pool = new ThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS,
+        new SynchronousQueue<Runnable>(), Threads.newDaemonThreadFactory("hbase-table"));
+    ((ThreadPoolExecutor) this.pool).allowCoreThreadTimeOut(true);
+
+    this.finishSetup();
+  }
+
+  /**
+   * Creates an object to access a HBase table.
+   * Shares zookeeper connection and other resources with other HTable instances
+   * created with the same <code>conf</code> instance.  Uses already-populated
+   * region cache if one is available, populated by any other HTable instances
+   * sharing this <code>conf</code> instance.
+   * Use this constructor when the ExecutorService is externally managed.
+   * @param conf Configuration object to use.
+   * @param tableName Name of the table.
+   * @param pool ExecutorService to be used.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HTable(Configuration conf, final byte[] tableName, final ExecutorService pool)
+      throws IOException {
+    this.connection = HConnectionManager.getConnection(conf);
+    this.configuration = conf;
+    this.pool = pool;
+    this.tableName = tableName;
+    this.cleanupPoolOnClose = false;
+    this.cleanupConnectionOnClose = true;
+
+    this.finishSetup();
+  }
+
+  /**
+   * Creates an object to access a HBase table.
+   * Shares zookeeper connection and other resources with other HTable instances
+   * created with the same <code>connection</code> instance.
+   * Use this constructor when the ExecutorService and HConnection instance are
+   * externally managed.
+   * @param tableName Name of the table.
+   * @param connection HConnection to be used.
+   * @param pool ExecutorService to be used.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HTable(final byte[] tableName, final HConnection connection,
+      final ExecutorService pool) throws IOException {
+    if (pool == null || pool.isShutdown()) {
+      throw new IllegalArgumentException("Pool is null or shut down.");
+    }
+    if (connection == null || connection.isClosed()) {
+      throw new IllegalArgumentException("Connection is null or closed.");
+    }
+    this.tableName = tableName;
+    this.cleanupPoolOnClose = this.cleanupConnectionOnClose = false;
+    this.connection = connection;
+    this.configuration = connection.getConfiguration();
+    this.pool = pool;
+
+    this.finishSetup();
+  }
+
+  /**
+   * setup this HTable's parameter based on the passed configuration
+   */
+  private void finishSetup() throws IOException {
+    this.connection.locateRegion(tableName, HConstants.EMPTY_START_ROW);
+    this.operationTimeout = HTableDescriptor.isMetaTable(tableName) ? HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT
+        : this.configuration.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+            HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+    this.writeBufferSize = this.configuration.getLong(
+        "hbase.client.write.buffer", 2097152);
+    this.clearBufferOnFail = true;
+    this.autoFlush = true;
+    this.currentWriteBufferSize = 0;
+    this.scannerCaching = this.configuration.getInt(
+        HConstants.HBASE_CLIENT_SCANNER_CACHING,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_CACHING);
+
+    this.maxKeyValueSize = this.configuration.getInt(
+        "hbase.client.keyvalue.maxsize", -1);
+    this.closed = false;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Configuration getConfiguration() {
+    return configuration;
+  }
+
+  /**
+   * Tells whether or not a table is enabled or not. This method creates a
+   * new HBase configuration, so it might make your unit tests fail due to
+   * incorrect ZK client port.
+   * @param tableName Name of table to check.
+   * @return {@code true} if table is online.
+   * @throws IOException if a remote or network exception occurs
+	* @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
+   */
+  @Deprecated
+  public static boolean isTableEnabled(String tableName) throws IOException {
+    return isTableEnabled(Bytes.toBytes(tableName));
+  }
+
+  /**
+   * Tells whether or not a table is enabled or not. This method creates a
+   * new HBase configuration, so it might make your unit tests fail due to
+   * incorrect ZK client port.
+   * @param tableName Name of table to check.
+   * @return {@code true} if table is online.
+   * @throws IOException if a remote or network exception occurs
+   * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
+   */
+  @Deprecated
+  public static boolean isTableEnabled(byte[] tableName) throws IOException {
+    return isTableEnabled(HBaseConfiguration.create(), tableName);
+  }
+
+  /**
+   * Tells whether or not a table is enabled or not.
+   * @param conf The Configuration object to use.
+   * @param tableName Name of table to check.
+   * @return {@code true} if table is online.
+   * @throws IOException if a remote or network exception occurs
+	 * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[])}
+   */
+  @Deprecated
+  public static boolean isTableEnabled(Configuration conf, String tableName)
+  throws IOException {
+    return isTableEnabled(conf, Bytes.toBytes(tableName));
+  }
+
+  /**
+   * Tells whether or not a table is enabled or not.
+   * @param conf The Configuration object to use.
+   * @param tableName Name of table to check.
+   * @return {@code true} if table is online.
+   * @throws IOException if a remote or network exception occurs
+   * @deprecated use {@link HBaseAdmin#isTableEnabled(byte[] tableName)}
+   */
+  @Deprecated
+  public static boolean isTableEnabled(Configuration conf,
+      final byte[] tableName) throws IOException {
+    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+      @Override
+      public Boolean connect(HConnection connection) throws IOException {
+        return connection.isTableEnabled(tableName);
+      }
+    });
+  }
+
+  /**
+   * Find region location hosting passed row using cached info
+   * @param row Row to find.
+   * @return The location of the given row.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HRegionLocation getRegionLocation(final String row)
+  throws IOException {
+    return connection.getRegionLocation(tableName, Bytes.toBytes(row), false);
+  }
+
+  /**
+   * Finds the region on which the given row is being served. Does not reload the cache.
+   * @param row Row to find.
+   * @return Location of the row.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HRegionLocation getRegionLocation(final byte [] row)
+  throws IOException {
+    return connection.getRegionLocation(tableName, row, false);
+  }
+
+  /**
+   * Finds the region on which the given row is being served.
+   * @param row Row to find.
+   * @param reload true to reload information or false to use cached information
+   * @return Location of the row.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HRegionLocation getRegionLocation(final byte [] row, boolean reload)
+  throws IOException {
+    return connection.getRegionLocation(tableName, row, reload);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public byte [] getTableName() {
+    return this.tableName;
+  }
+
+  /**
+   * <em>INTERNAL</em> Used by unit tests and tools to do low-level
+   * manipulations.
+   * @return An HConnection instance.
+   * @deprecated This method will be changed from public to package protected.
+   */
+  // TODO(tsuna): Remove this.  Unit tests shouldn't require public helpers.
+  @Deprecated
+  public HConnection getConnection() {
+    return this.connection;
+  }
+
+  /**
+   * Gets the number of rows that a scanner will fetch at once.
+   * <p>
+   * The default value comes from {@code hbase.client.scanner.caching}.
+   * @deprecated Use {@link Scan#setCaching(int)} and {@link Scan#getCaching()}
+   */
+  @Deprecated
+  public int getScannerCaching() {
+    return scannerCaching;
+  }
+
+  /**
+   * Sets the number of rows that a scanner will fetch at once.
+   * <p>
+   * This will override the value specified by
+   * {@code hbase.client.scanner.caching}.
+   * Increasing this value will reduce the amount of work needed each time
+   * {@code next()} is called on a scanner, at the expense of memory use
+   * (since more rows will need to be maintained in memory by the scanners).
+   * @param scannerCaching the number of rows a scanner will fetch at once.
+   * @deprecated Use {@link Scan#setCaching(int)}
+   */
+  @Deprecated
+  public void setScannerCaching(int scannerCaching) {
+    this.scannerCaching = scannerCaching;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public HTableDescriptor getTableDescriptor() throws IOException {
+    return new UnmodifyableHTableDescriptor(
+      this.connection.getHTableDescriptor(this.tableName));
+  }
+
+  /**
+   * Gets the starting row key for every region in the currently open table.
+   * <p>
+   * This is mainly useful for the MapReduce integration.
+   * @return Array of region starting row keys
+   * @throws IOException if a remote or network exception occurs
+   */
+  public byte [][] getStartKeys() throws IOException {
+    return getStartEndKeys().getFirst();
+  }
+
+  /**
+   * Gets the ending row key for every region in the currently open table.
+   * <p>
+   * This is mainly useful for the MapReduce integration.
+   * @return Array of region ending row keys
+   * @throws IOException if a remote or network exception occurs
+   */
+  public byte[][] getEndKeys() throws IOException {
+    return getStartEndKeys().getSecond();
+  }
+
+  /**
+   * Gets the starting and ending row keys for every region in the currently
+   * open table.
+   * <p>
+   * This is mainly useful for the MapReduce integration.
+   * @return Pair of arrays of region starting and ending row keys
+   * @throws IOException if a remote or network exception occurs
+   */
+  public Pair<byte[][],byte[][]> getStartEndKeys() throws IOException {
+    NavigableMap<HRegionInfo, ServerName> regions = getRegionLocations();
+    final List<byte[]> startKeyList = new ArrayList<byte[]>(regions.size());
+    final List<byte[]> endKeyList = new ArrayList<byte[]>(regions.size());
+
+    for (HRegionInfo region : regions.keySet()) {
+      startKeyList.add(region.getStartKey());
+      endKeyList.add(region.getEndKey());
+    }
+
+    return new Pair<byte [][], byte [][]>(
+      startKeyList.toArray(new byte[startKeyList.size()][]),
+      endKeyList.toArray(new byte[endKeyList.size()][]));
+  }
+
+  /**
+   * Gets all the regions and their address for this table.
+   * <p>
+   * This is mainly useful for the MapReduce integration.
+   * @return A map of HRegionInfo with it's server address
+   * @throws IOException if a remote or network exception occurs
+   */
+  public NavigableMap<HRegionInfo, ServerName> getRegionLocations() throws IOException {
+    // TODO: Odd that this returns a Map of HRI to SN whereas getRegionLocation, singular, returns an HRegionLocation.
+    return MetaScanner.allTableRegions(getConfiguration(), getTableName(), false);
+  }
+
+  /**
+   * Get the corresponding regions for an arbitrary range of keys.
+   * <p>
+   * @param startKey Starting row in range, inclusive
+   * @param endKey Ending row in range, exclusive
+   * @return A list of HRegionLocations corresponding to the regions that
+   * contain the specified range
+   * @throws IOException if a remote or network exception occurs
+   */
+  public List<HRegionLocation> getRegionsInRange(final byte [] startKey,
+    final byte [] endKey) throws IOException {
+    final boolean endKeyIsEndOfTable = Bytes.equals(endKey,
+                                                    HConstants.EMPTY_END_ROW);
+    if ((Bytes.compareTo(startKey, endKey) > 0) && !endKeyIsEndOfTable) {
+      throw new IllegalArgumentException(
+        "Invalid range: " + Bytes.toStringBinary(startKey) +
+        " > " + Bytes.toStringBinary(endKey));
+    }
+    final List<HRegionLocation> regionList = new ArrayList<HRegionLocation>();
+    byte [] currentKey = startKey;
+    do {
+      HRegionLocation regionLocation = getRegionLocation(currentKey, false);
+      regionList.add(regionLocation);
+      currentKey = regionLocation.getRegionInfo().getEndKey();
+    } while (!Bytes.equals(currentKey, HConstants.EMPTY_END_ROW) &&
+             (endKeyIsEndOfTable || Bytes.compareTo(currentKey, endKey) < 0));
+    return regionList;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+   @Override
+   public Result getRowOrBefore(final byte[] row, final byte[] family)
+   throws IOException {
+     return new ServerCallable<Result>(connection, tableName, row, operationTimeout) {
+       public Result call() throws IOException {
+         return ProtobufUtil.getRowOrBefore(server,
+           location.getRegionInfo().getRegionName(), row, family);
+       }
+     }.withRetries();
+   }
+
+   /**
+    * {@inheritDoc}
+    */
+  @Override
+  public ResultScanner getScanner(final Scan scan) throws IOException {
+    if (scan.getCaching() <= 0) {
+      scan.setCaching(getScannerCaching());
+    }
+    return new ClientScanner(getConfiguration(), scan, getTableName(),
+        this.connection);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ResultScanner getScanner(byte [] family) throws IOException {
+    Scan scan = new Scan();
+    scan.addFamily(family);
+    return getScanner(scan);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ResultScanner getScanner(byte [] family, byte [] qualifier)
+  throws IOException {
+    Scan scan = new Scan();
+    scan.addColumn(family, qualifier);
+    return getScanner(scan);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Result get(final Get get) throws IOException {
+    return new ServerCallable<Result>(connection, tableName, get.getRow(), operationTimeout) {
+          public Result call() throws IOException {
+            return ProtobufUtil.get(server,
+              location.getRegionInfo().getRegionName(), get);
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Result[] get(List<Get> gets) throws IOException {
+    try {
+      Object [] r1 = batch((List)gets);
+
+      // translate.
+      Result [] results = new Result[r1.length];
+      int i=0;
+      for (Object o : r1) {
+        // batch ensures if there is a failure we get an exception instead
+        results[i++] = (Result) o;
+      }
+
+      return results;
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  @Override
+  public void batch(final List<?extends Row> actions, final Object[] results)
+      throws InterruptedException, IOException {
+    connection.processBatchCallback(actions, tableName, pool, results, null);
+  }
+
+  @Override
+  public Object[] batch(final List<? extends Row> actions)
+     throws InterruptedException, IOException {
+    Object[] results = new Object[actions.size()];
+    connection.processBatchCallback(actions, tableName, pool, results, null);
+    return results;
+  }
+
+  @Override
+  public <R> void batchCallback(
+    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException {
+    connection.processBatchCallback(actions, tableName, pool, results, callback);
+  }
+
+  @Override
+  public <R> Object[] batchCallback(
+    final List<? extends Row> actions, final Batch.Callback<R> callback) throws IOException,
+      InterruptedException {
+    Object[] results = new Object[actions.size()];
+    connection.processBatchCallback(actions, tableName, pool, results, callback);
+    return results;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void delete(final Delete delete)
+  throws IOException {
+    new ServerCallable<Boolean>(connection, tableName, delete.getRow(), operationTimeout) {
+          public Boolean call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), delete);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void delete(final List<Delete> deletes)
+  throws IOException {
+    Object[] results = new Object[deletes.size()];
+    try {
+      connection.processBatch((List) deletes, tableName, pool, results);
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    } finally {
+      // mutate list so that it is empty for complete success, or contains only failed records
+      // results are returned in the same order as the requests in list
+      // walk the list backwards, so we can remove from list without impacting the indexes of earlier members
+      for (int i = results.length - 1; i>=0; i--) {
+        // if result is not null, it succeeded
+        if (results[i] instanceof Result) {
+          deletes.remove(i);
+        }
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void put(final Put put) throws IOException {
+    doPut(Arrays.asList(put));
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void put(final List<Put> puts) throws IOException {
+    doPut(puts);
+  }
+
+  private void doPut(final List<Put> puts) throws IOException {
+    int n = 0;
+    for (Put put : puts) {
+      validatePut(put);
+      writeBuffer.add(put);
+      currentWriteBufferSize += put.heapSize();
+
+      // we need to periodically see if the writebuffer is full instead of waiting until the end of the List
+      n++;
+      if (n % DOPUT_WB_CHECK == 0 && currentWriteBufferSize > writeBufferSize) {
+        flushCommits();
+      }
+    }
+    if (autoFlush || currentWriteBufferSize > writeBufferSize) {
+      flushCommits();
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void mutateRow(final RowMutations rm) throws IOException {
+    new ServerCallable<Void>(connection, tableName, rm.getRow(),
+        operationTimeout) {
+      public Void call() throws IOException {
+        try {
+          MultiRequest request = RequestConverter.buildMultiRequest(
+            location.getRegionInfo().getRegionName(), rm);
+          server.multi(null, request);
+        } catch (ServiceException se) {
+          throw ProtobufUtil.getRemoteException(se);
+        }
+        return null;
+      }
+    }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Result append(final Append append) throws IOException {
+    if (append.numFamilies() == 0) {
+      throw new IOException(
+          "Invalid arguments to append, no columns specified");
+    }
+    return new ServerCallable<Result>(connection, tableName, append.getRow(), operationTimeout) {
+          public Result call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), append);
+              MutateResponse response = server.mutate(null, request);
+              if (!response.hasResult()) return null;
+              return ProtobufUtil.toResult(response.getResult());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Result increment(final Increment increment) throws IOException {
+    if (!increment.hasFamilies()) {
+      throw new IOException(
+          "Invalid arguments to increment, no columns specified");
+    }
+    return new ServerCallable<Result>(connection, tableName, increment.getRow(), operationTimeout) {
+          public Result call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), increment);
+              MutateResponse response = server.mutate(null, request);
+              return ProtobufUtil.toResult(response.getResult());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long incrementColumnValue(final byte [] row, final byte [] family,
+      final byte [] qualifier, final long amount)
+  throws IOException {
+    return incrementColumnValue(row, family, qualifier, amount, true);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public long incrementColumnValue(final byte [] row, final byte [] family,
+      final byte [] qualifier, final long amount, final boolean writeToWAL)
+  throws IOException {
+    NullPointerException npe = null;
+    if (row == null) {
+      npe = new NullPointerException("row is null");
+    } else if (family == null) {
+      npe = new NullPointerException("family is null");
+    } else if (qualifier == null) {
+      npe = new NullPointerException("qualifier is null");
+    }
+    if (npe != null) {
+      throw new IOException(
+          "Invalid arguments to incrementColumnValue", npe);
+    }
+    return new ServerCallable<Long>(connection, tableName, row, operationTimeout) {
+          public Long call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), row, family,
+                qualifier, amount, writeToWAL);
+              MutateResponse response = server.mutate(null, request);
+              Result result = ProtobufUtil.toResult(response.getResult());
+              return Long.valueOf(Bytes.toLong(result.getValue(family, qualifier)));
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean checkAndPut(final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Put put)
+  throws IOException {
+    return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
+          public Boolean call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), row, family, qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, put);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean checkAndDelete(final byte [] row,
+      final byte [] family, final byte [] qualifier, final byte [] value,
+      final Delete delete)
+  throws IOException {
+    return new ServerCallable<Boolean>(connection, tableName, row, operationTimeout) {
+          public Boolean call() throws IOException {
+            try {
+              MutateRequest request = RequestConverter.buildMutateRequest(
+                location.getRegionInfo().getRegionName(), row, family, qualifier,
+                new BinaryComparator(value), CompareType.EQUAL, delete);
+              MutateResponse response = server.mutate(null, request);
+              return Boolean.valueOf(response.getProcessed());
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean exists(final Get get) throws IOException {
+    return new ServerCallable<Boolean>(connection, tableName, get.getRow(), operationTimeout) {
+          public Boolean call() throws IOException {
+            try {
+              GetRequest request = RequestConverter.buildGetRequest(
+                  location.getRegionInfo().getRegionName(), get, true);
+              GetResponse response = server.get(null, request);
+              return response.getExists();
+            } catch (ServiceException se) {
+              throw ProtobufUtil.getRemoteException(se);
+            }
+          }
+        }.withRetries();
+  }
+
+  /**
+   * Goal of this inner class is to keep track of the initial position of a get in a list before
+   * sorting it. This is used to send back results in the same orders we got the Gets before we sort
+   * them.
+   */
+  private static class SortedGet implements Comparable<SortedGet> {
+    protected int initialIndex = -1; // Used to store the get initial index in a list.
+    protected Get get; // Encapsulated Get instance.
+
+    public SortedGet (Get get, int initialIndex) {
+      this.get = get;
+      this.initialIndex = initialIndex;
+    }
+
+    public int getInitialIndex() {
+      return initialIndex;
+    }
+
+    @Override
+    public int compareTo(SortedGet o) {
+      return get.compareTo(o.get);
+    }
+
+    public Get getGet() {
+      return get;
+    }
+
+    @Override
+    public int hashCode() {
+      return get.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj instanceof SortedGet)
+        return get.equals(((SortedGet)obj).get);
+      else
+        return false;
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Boolean[] exists(final List<Get> gets) throws IOException {
+    // Prepare the sorted list of gets. Take the list of gets received, and encapsulate them into
+    // a list of SortedGet instances. Simple list parsing, so complexity here is O(n)
+    // The list is later used to recreate the response order based on the order the Gets
+    // got received.
+    ArrayList<SortedGet> sortedGetsList = new ArrayList<HTable.SortedGet>();
+    for (int indexGet = 0; indexGet < gets.size(); indexGet++) {
+      sortedGetsList.add(new SortedGet (gets.get(indexGet), indexGet));
+    }
+
+    // Sorting the list to get the Gets ordered based on the key.
+    Collections.sort(sortedGetsList); // O(n log n)
+
+    // step 1: sort the requests by regions to send them bundled.
+    // Map key is startKey index. Map value is the list of Gets related to the region starting
+    // with the startKey.
+    Map<Integer, List<Get>> getsByRegion = new HashMap<Integer, List<Get>>();
+
+    // Reference map to quickly find back in which region a get belongs.
+    Map<Get, Integer> getToRegionIndexMap = new HashMap<Get, Integer>();
+    Pair<byte[][], byte[][]> startEndKeys = getStartEndKeys();
+
+    int regionIndex = 0;
+    for (final SortedGet get : sortedGetsList) {
+      // Progress on the regions until we find the one the current get resides in.
+      while ((regionIndex < startEndKeys.getSecond().length) && ((Bytes.compareTo(startEndKeys.getSecond()[regionIndex], get.getGet().getRow()) <= 0))) {
+        regionIndex++;
+      }
+      List<Get> regionGets = getsByRegion.get(regionIndex);
+      if (regionGets == null) {
+        regionGets = new ArrayList<Get>();
+        getsByRegion.put(regionIndex, regionGets);
+      }
+      regionGets.add(get.getGet());
+      getToRegionIndexMap.put(get.getGet(), regionIndex);
+    }
+
+    // step 2: make the requests
+    Map<Integer, Future<List<Boolean>>> futures =
+        new HashMap<Integer, Future<List<Boolean>>>(sortedGetsList.size());
+    for (final Map.Entry<Integer, List<Get>> getsByRegionEntry : getsByRegion.entrySet()) {
+      Callable<List<Boolean>> callable = new Callable<List<Boolean>>() {
+        public List<Boolean> call() throws Exception {
+          return new ServerCallable<List<Boolean>>(connection, tableName, getsByRegionEntry.getValue()
+              .get(0).getRow(), operationTimeout) {
+            public List<Boolean> call() throws IOException {
+              try {
+                MultiGetRequest requests = RequestConverter.buildMultiGetRequest(location
+                    .getRegionInfo().getRegionName(), getsByRegionEntry.getValue(), true, false);
+                MultiGetResponse responses = server.multiGet(null, requests);
+                return responses.getExistsList();
+              } catch (ServiceException se) {
+                throw ProtobufUtil.getRemoteException(se);
+              }
+            }
+          }.withRetries();
+        }
+      };
+      futures.put(getsByRegionEntry.getKey(), pool.submit(callable));
+    }
+
+    // step 3: collect the failures and successes
+    Map<Integer, List<Boolean>> responses = new HashMap<Integer, List<Boolean>>();
+    for (final Map.Entry<Integer, List<Get>> sortedGetEntry : getsByRegion.entrySet()) {
+      try {
+        Future<List<Boolean>> future = futures.get(sortedGetEntry.getKey());
+        List<Boolean> resp = future.get();
+
+        if (resp == null) {
+          LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+        }
+        responses.put(sortedGetEntry.getKey(), resp);
+      } catch (ExecutionException e) {
+        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+      } catch (InterruptedException e) {
+        LOG.warn("Failed for gets on region: " + sortedGetEntry.getKey());
+        Thread.currentThread().interrupt();
+      }
+    }
+    Boolean[] results = new Boolean[sortedGetsList.size()];
+
+    // step 4: build the response.
+    Map<Integer, Integer> indexes = new HashMap<Integer, Integer>();
+    for (int i = 0; i < sortedGetsList.size(); i++) {
+      Integer regionInfoIndex = getToRegionIndexMap.get(sortedGetsList.get(i).getGet());
+      Integer index = indexes.get(regionInfoIndex);
+      if (index == null) {
+        index = 0;
+      }
+      results[sortedGetsList.get(i).getInitialIndex()] = responses.get(regionInfoIndex).get(index);
+      indexes.put(regionInfoIndex, index + 1);
+    }
+
+    return results;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void flushCommits() throws IOException {
+    Object[] results = new Object[writeBuffer.size()];
+    boolean success = false;
+    try {
+      this.connection.processBatch(writeBuffer, tableName, pool, results);
+      success = true;
+    } catch (InterruptedException e) {
+      throw new InterruptedIOException(e.getMessage());
+    } finally {
+      // mutate list so that it is empty for complete success, or contains
+      // only failed records. Results are returned in the same order as the
+      // requests in list. Walk the list backwards, so we can remove from list
+      // without impacting the indexes of earlier members
+      currentWriteBufferSize = 0;
+      if (success || clearBufferOnFail) {
+        writeBuffer.clear();
+      } else {
+        for (int i = results.length - 1; i >= 0; i--) {
+          if (results[i] instanceof Result) {
+            writeBuffer.remove(i);
+          } else {
+            currentWriteBufferSize += writeBuffer.get(i).heapSize();
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Process a mixed batch of Get, Put and Delete actions. All actions for a
+   * RegionServer are forwarded in one RPC call. Queries are executed in parallel.
+   *
+   * @param list The collection of actions.
+   * @param results An empty array, same size as list. If an exception is thrown,
+   * you can test here for partial results, and to determine which actions
+   * processed successfully.
+   * @throws IOException if there are problems talking to META. Per-item
+   * exceptions are stored in the results array.
+   */
+  public <R> void processBatchCallback(
+    final List<? extends Row> list, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException {
+    connection.processBatchCallback(list, tableName, pool, results, callback);
+  }
+
+
+  /**
+   * Parameterized batch processing, allowing varying return types for different
+   * {@link Row} implementations.
+   */
+  public void processBatch(final List<? extends Row> list, final Object[] results)
+    throws IOException, InterruptedException {
+
+    this.processBatchCallback(list, results, null);
+  }
+
+
+  @Override
+  public void close() throws IOException {
+    if (this.closed) {
+      return;
+    }
+    flushCommits();
+    if (cleanupPoolOnClose) {
+      this.pool.shutdown();
+    }
+    if (cleanupConnectionOnClose) {
+      if (this.connection != null) {
+        this.connection.close();
+      }
+    }
+    this.closed = true;
+  }
+
+  // validate for well-formedness
+  public void validatePut(final Put put) throws IllegalArgumentException{
+    if (put.isEmpty()) {
+      throw new IllegalArgumentException("No columns to insert");
+    }
+    if (maxKeyValueSize > 0) {
+      for (List<KeyValue> list : put.getFamilyMap().values()) {
+        for (KeyValue kv : list) {
+          if (kv.getLength() > maxKeyValueSize) {
+            throw new IllegalArgumentException("KeyValue size too large");
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean isAutoFlush() {
+    return autoFlush;
+  }
+
+  /**
+   * See {@link #setAutoFlush(boolean, boolean)}
+   *
+   * @param autoFlush
+   *          Whether or not to enable 'auto-flush'.
+   */
+  public void setAutoFlush(boolean autoFlush) {
+    setAutoFlush(autoFlush, autoFlush);
+  }
+
+  /**
+   * Turns 'auto-flush' on or off.
+   * <p>
+   * When enabled (default), {@link Put} operations don't get buffered/delayed
+   * and are immediately executed. Failed operations are not retried. This is
+   * slower but safer.
+   * <p>
+   * Turning off {@link #autoFlush} means that multiple {@link Put}s will be
+   * accepted before any RPC is actually sent to do the write operations. If the
+   * application dies before pending writes get flushed to HBase, data will be
+   * lost.
+   * <p>
+   * When you turn {@link #autoFlush} off, you should also consider the
+   * {@link #clearBufferOnFail} option. By default, asynchronous {@link Put}
+   * requests will be retried on failure until successful. However, this can
+   * pollute the writeBuffer and slow down batching performance. Additionally,
+   * you may want to issue a number of Put requests and call
+   * {@link #flushCommits()} as a barrier. In both use cases, consider setting
+   * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
+   * has been called, regardless of success.
+   *
+   * @param autoFlush
+   *          Whether or not to enable 'auto-flush'.
+   * @param clearBufferOnFail
+   *          Whether to keep Put failures in the writeBuffer
+   * @see #flushCommits
+   */
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail) {
+    this.autoFlush = autoFlush;
+    this.clearBufferOnFail = autoFlush || clearBufferOnFail;
+  }
+
+  /**
+   * Returns the maximum size in bytes of the write buffer for this HTable.
+   * <p>
+   * The default value comes from the configuration parameter
+   * {@code hbase.client.write.buffer}.
+   * @return The size of the write buffer in bytes.
+   */
+  public long getWriteBufferSize() {
+    return writeBufferSize;
+  }
+
+  /**
+   * Sets the size of the buffer in bytes.
+   * <p>
+   * If the new size is less than the current amount of data in the
+   * write buffer, the buffer gets flushed.
+   * @param writeBufferSize The new write buffer size, in bytes.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  public void setWriteBufferSize(long writeBufferSize) throws IOException {
+    this.writeBufferSize = writeBufferSize;
+    if(currentWriteBufferSize > writeBufferSize) {
+      flushCommits();
+    }
+  }
+
+  /**
+   * Returns the write buffer.
+   * @return The current write buffer.
+   */
+  public ArrayList<Put> getWriteBuffer() {
+    return writeBuffer;
+  }
+
+  /**
+   * The pool is used for mutli requests for this HTable
+   * @return the pool used for mutli
+   */
+  ExecutorService getPool() {
+    return this.pool;
+  }
+
+  /**
+   * Enable or disable region cache prefetch for the table. It will be
+   * applied for the given table's all HTable instances who share the same
+   * connection. By default, the cache prefetch is enabled.
+   * @param tableName name of table to configure.
+   * @param enable Set to true to enable region cache prefetch. Or set to
+   * false to disable it.
+   * @throws IOException
+   */
+  public static void setRegionCachePrefetch(final byte[] tableName,
+      final boolean enable) throws IOException {
+    HConnectionManager.execute(new HConnectable<Void>(HBaseConfiguration
+        .create()) {
+      @Override
+      public Void connect(HConnection connection) throws IOException {
+        connection.setRegionCachePrefetch(tableName, enable);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Enable or disable region cache prefetch for the table. It will be
+   * applied for the given table's all HTable instances who share the same
+   * connection. By default, the cache prefetch is enabled.
+   * @param conf The Configuration object to use.
+   * @param tableName name of table to configure.
+   * @param enable Set to true to enable region cache prefetch. Or set to
+   * false to disable it.
+   * @throws IOException
+   */
+  public static void setRegionCachePrefetch(final Configuration conf,
+      final byte[] tableName, final boolean enable) throws IOException {
+    HConnectionManager.execute(new HConnectable<Void>(conf) {
+      @Override
+      public Void connect(HConnection connection) throws IOException {
+        connection.setRegionCachePrefetch(tableName, enable);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * Check whether region cache prefetch is enabled or not for the table.
+   * @param conf The Configuration object to use.
+   * @param tableName name of table to check
+   * @return true if table's region cache prefecth is enabled. Otherwise
+   * it is disabled.
+   * @throws IOException
+   */
+  public static boolean getRegionCachePrefetch(final Configuration conf,
+      final byte[] tableName) throws IOException {
+    return HConnectionManager.execute(new HConnectable<Boolean>(conf) {
+      @Override
+      public Boolean connect(HConnection connection) throws IOException {
+        return connection.getRegionCachePrefetch(tableName);
+      }
+    });
+  }
+
+  /**
+   * Check whether region cache prefetch is enabled or not for the table.
+   * @param tableName name of table to check
+   * @return true if table's region cache prefecth is enabled. Otherwise
+   * it is disabled.
+   * @throws IOException
+   */
+  public static boolean getRegionCachePrefetch(final byte[] tableName) throws IOException {
+    return HConnectionManager.execute(new HConnectable<Boolean>(
+        HBaseConfiguration.create()) {
+      @Override
+      public Boolean connect(HConnection connection) throws IOException {
+        return connection.getRegionCachePrefetch(tableName);
+      }
+    });
+ }
+
+  /**
+   * Explicitly clears the region cache to fetch the latest value from META.
+   * This is a power user function: avoid unless you know the ramifications.
+   */
+  public void clearRegionCache() {
+    this.connection.clearRegionCache();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public CoprocessorRpcChannel coprocessorService(byte[] row) {
+    return new RegionCoprocessorRpcChannel(connection, tableName, row);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
+      throws ServiceException, Throwable {
+    final Map<byte[],R> results =  Collections.synchronizedMap(
+        new TreeMap<byte[], R>(Bytes.BYTES_COMPARATOR));
+    coprocessorService(service, startKey, endKey, callable, new Batch.Callback<R>() {
+      public void update(byte[] region, byte[] row, R value) {
+        results.put(region, value);
+      }
+    });
+    return results;
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public <T extends Service, R> void coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
+      final Batch.Callback<R> callback) throws ServiceException, Throwable {
+
+    // get regions covered by the row range
+    List<byte[]> keys = getStartKeysInRange(startKey, endKey);
+
+    Map<byte[],Future<R>> futures =
+        new TreeMap<byte[],Future<R>>(Bytes.BYTES_COMPARATOR);
+    for (final byte[] r : keys) {
+      final RegionCoprocessorRpcChannel channel =
+          new RegionCoprocessorRpcChannel(connection, tableName, r);
+      Future<R> future = pool.submit(
+          new Callable<R>() {
+            public R call() throws Exception {
+              T instance = ProtobufUtil.newServiceStub(service, channel);
+              R result = callable.call(instance);
+              byte[] region = channel.getLastRegion();
+              if (callback != null) {
+                callback.update(region, r, result);
+              }
+              return result;
+            }
+          });
+      futures.put(r, future);
+    }
+    for (Map.Entry<byte[],Future<R>> e : futures.entrySet()) {
+      try {
+        e.getValue().get();
+      } catch (ExecutionException ee) {
+        LOG.warn("Error calling coprocessor service " + service.getName() + " for row "
+            + Bytes.toStringBinary(e.getKey()), ee);
+        throw ee.getCause();
+      } catch (InterruptedException ie) {
+        Thread.currentThread().interrupt();
+        throw new InterruptedIOException("Interrupted calling coprocessor service " + service.getName()
+            + " for row " + Bytes.toStringBinary(e.getKey()))
+            .initCause(ie);
+      }
+    }
+  }
+
+  private List<byte[]> getStartKeysInRange(byte[] start, byte[] end)
+  throws IOException {
+    Pair<byte[][],byte[][]> startEndKeys = getStartEndKeys();
+    byte[][] startKeys = startEndKeys.getFirst();
+    byte[][] endKeys = startEndKeys.getSecond();
+
+    if (start == null) {
+      start = HConstants.EMPTY_START_ROW;
+    }
+    if (end == null) {
+      end = HConstants.EMPTY_END_ROW;
+    }
+
+    List<byte[]> rangeKeys = new ArrayList<byte[]>();
+    for (int i=0; i<startKeys.length; i++) {
+      if (Bytes.compareTo(start, startKeys[i]) >= 0 ) {
+        if (Bytes.equals(endKeys[i], HConstants.EMPTY_END_ROW) ||
+            Bytes.compareTo(start, endKeys[i]) < 0) {
+          rangeKeys.add(start);
+        }
+      } else if (Bytes.equals(end, HConstants.EMPTY_END_ROW) ||
+          Bytes.compareTo(startKeys[i], end) <= 0) {
+        rangeKeys.add(startKeys[i]);
+      } else {
+        break; // past stop
+      }
+    }
+
+    return rangeKeys;
+  }
+
+  public void setOperationTimeout(int operationTimeout) {
+    this.operationTimeout = operationTimeout;
+  }
+
+  public int getOperationTimeout() {
+    return operationTimeout;
+  }
+
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableFactory.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableFactory.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableFactory.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,49 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+/**
+ * Factory for creating HTable instances.
+ *
+ * @since 0.21.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public class HTableFactory implements HTableInterfaceFactory {
+  @Override
+  public HTableInterface createHTableInterface(Configuration config,
+      byte[] tableName) {
+    try {
+      return new HTable(config, tableName);
+    } catch (IOException ioe) {
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  @Override
+  public void releaseHTableInterface(HTableInterface table) throws IOException {
+    table.close();
+  }
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,553 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.Service;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Used to communicate with a single HBase table.
+ *
+ * @since 0.21.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface HTableInterface extends Closeable {
+
+  /**
+   * Gets the name of this table.
+   *
+   * @return the table name.
+   */
+  byte[] getTableName();
+
+  /**
+   * Returns the {@link Configuration} object used by this instance.
+   * <p>
+   * The reference returned is not a copy, so any change made to it will
+   * affect this instance.
+   */
+  Configuration getConfiguration();
+
+  /**
+   * Gets the {@link HTableDescriptor table descriptor} for this table.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  HTableDescriptor getTableDescriptor() throws IOException;
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Get.
+   * <p>
+   *
+   * This will return true if the Get matches one or more keys, false if not.
+   * <p>
+   *
+   * This is a server-side call so it prevents any data from being transfered to
+   * the client.
+   *
+   * @param get the Get
+   * @return true if the specified Get matches one or more keys, false if not
+   * @throws IOException e
+   */
+  boolean exists(Get get) throws IOException;
+
+  /**
+   * Test for the existence of columns in the table, as specified by the Gets.
+   * <p>
+   *
+   * This will return an array of booleans. Each value will be true if the related Get matches
+   * one or more keys, false if not.
+   * <p>
+   *
+   * This is a server-side call so it prevents any data from being transfered to
+   * the client.
+   *
+   * @param gets the Gets
+   * @return Array of Boolean true if the specified Get matches one or more keys, false if not
+   * @throws IOException e
+   */
+  Boolean[] exists(List<Get> gets) throws IOException;
+
+  /**
+   * Method that does a batch call on Deletes, Gets, Puts, Increments, Appends and RowMutations.
+   * The ordering of execution of the actions is not defined. Meaning if you do a Put and a
+   * Get in the same {@link #batch} call, you will not necessarily be
+   * guaranteed that the Get returns what the Put had put.
+   *
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations objects
+   * @param results Empty Object[], same size as actions. Provides access to partial
+   *                results, in case an exception is thrown. A null in the result array means that
+   *                the call for that action failed, even after retries
+   * @throws IOException
+   * @since 0.90.0
+   */
+  void batch(final List<?extends Row> actions, final Object[] results) throws IOException, InterruptedException;
+
+  /**
+   * Same as {@link #batch(List, Object[])}, but returns an array of
+   * results instead of using a results parameter reference.
+   *
+   * @param actions list of Get, Put, Delete, Increment, Append, RowMutations objects
+   * @return the results from the actions. A null in the return array means that
+   *         the call for that action failed, even after retries
+   * @throws IOException
+   * @since 0.90.0
+   */
+  Object[] batch(final List<? extends Row> actions) throws IOException, InterruptedException;
+
+  /**
+   * Same as {@link #batch(List, Object[])}, but with a callback.
+   * @since 0.96.0
+   */
+  public <R> void batchCallback(
+    final List<? extends Row> actions, final Object[] results, final Batch.Callback<R> callback)
+    throws IOException, InterruptedException;
+
+
+  /**
+   * Same as {@link #batch(List)}, but with a callback.
+   * @since 0.96.0
+   */
+  public <R> Object[] batchCallback(
+    List<? extends Row> actions, Batch.Callback<R> callback) throws IOException,
+    InterruptedException;
+
+  /**
+   * Extracts certain cells from a given row.
+   * @param get The object that specifies what data to fetch and from which row.
+   * @return The data coming from the specified row, if it exists.  If the row
+   * specified doesn't exist, the {@link Result} instance returned won't
+   * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  Result get(Get get) throws IOException;
+
+  /**
+   * Extracts certain cells from the given rows, in batch.
+   *
+   * @param gets The objects that specify what data to fetch and from which rows.
+   *
+   * @return The data coming from the specified rows, if it exists.  If the row
+   *         specified doesn't exist, the {@link Result} instance returned won't
+   *         contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+   *         If there are any failures even after retries, there will be a null in
+   *         the results array for those Gets, AND an exception will be thrown.
+   * @throws IOException if a remote or network exception occurs.
+   *
+   * @since 0.90.0
+   */
+  Result[] get(List<Get> gets) throws IOException;
+
+  /**
+   * Return the row that matches <i>row</i> exactly,
+   * or the one that immediately precedes it.
+   *
+   * @param row A row key.
+   * @param family Column family to include in the {@link Result}.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   * 
+   * @deprecated As of version 0.92 this method is deprecated without
+   * replacement.   
+   * getRowOrBefore is used internally to find entries in .META. and makes
+   * various assumptions about the table (which are true for .META. but not
+   * in general) to be efficient.
+   */
+  Result getRowOrBefore(byte[] row, byte[] family) throws IOException;
+
+  /**
+   * Returns a scanner on the current table as specified by the {@link Scan}
+   * object.
+   * Note that the passed {@link Scan}'s start row and caching properties
+   * maybe changed.
+   *
+   * @param scan A configured {@link Scan} object.
+   * @return A scanner.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  ResultScanner getScanner(Scan scan) throws IOException;
+
+  /**
+   * Gets a scanner on the current table for the given family.
+   *
+   * @param family The column family to scan.
+   * @return A scanner.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  ResultScanner getScanner(byte[] family) throws IOException;
+
+  /**
+   * Gets a scanner on the current table for the given family and qualifier.
+   *
+   * @param family The column family to scan.
+   * @param qualifier The column qualifier to scan.
+   * @return A scanner.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException;
+
+
+  /**
+   * Puts some data in the table.
+   * <p>
+   * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+   * until the internal buffer is full.
+   * @param put The data to put.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  void put(Put put) throws IOException;
+
+  /**
+   * Puts some data in the table, in batch.
+   * <p>
+   * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+   * until the internal buffer is full.
+   * <p>
+   * This can be used for group commit, or for submitting user defined
+   * batches.  The writeBuffer will be periodically inspected while the List
+   * is processed, so depending on the List size the writeBuffer may flush
+   * not at all, or more than once.
+   * @param puts The list of mutations to apply. The batch put is done by
+   * aggregating the iteration of the Puts over the write buffer
+   * at the client-side for a single RPC call.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  void put(List<Put> puts) throws IOException;
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the put.  If the passed value is null, the check
+   * is for the lack of column (ie: non-existance)
+   *
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param put data to put if check succeeds
+   * @throws IOException e
+   * @return true if the new put was executed, false otherwise
+   */
+  boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Put put) throws IOException;
+
+  /**
+   * Deletes the specified cells/row.
+   *
+   * @param delete The object that specifies what to delete.
+   * @throws IOException if a remote or network exception occurs.
+   * @since 0.20.0
+   */
+  void delete(Delete delete) throws IOException;
+
+  /**
+   * Deletes the specified cells/rows in bulk.
+   * @param deletes List of things to delete.  List gets modified by this
+   * method (in particular it gets re-ordered, so the order in which the elements
+   * are inserted in the list gives no guarantee as to the order in which the
+   * {@link Delete}s are executed).
+   * @throws IOException if a remote or network exception occurs. In that case
+   * the {@code deletes} argument will contain the {@link Delete} instances
+   * that have not be successfully applied.
+   * @since 0.20.1
+   */
+  void delete(List<Delete> deletes) throws IOException;
+
+  /**
+   * Atomically checks if a row/family/qualifier value matches the expected
+   * value. If it does, it adds the delete.  If the passed value is null, the
+   * check is for the lack of column (ie: non-existance)
+   *
+   * @param row to check
+   * @param family column family to check
+   * @param qualifier column qualifier to check
+   * @param value the expected value
+   * @param delete data to delete if check succeeds
+   * @throws IOException e
+   * @return true if the new delete was executed, false otherwise
+   */
+  boolean checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
+      byte[] value, Delete delete) throws IOException;
+
+  /**
+   * Performs multiple mutations atomically on a single row. Currently
+   * {@link Put} and {@link Delete} are supported.
+   *
+   * @param rm object that specifies the set of mutations to perform atomically
+   * @throws IOException
+   */
+  public void mutateRow(final RowMutations rm) throws IOException;
+
+  /**
+   * Appends values to one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers.  Appends are done
+   * under a single row lock, so write operations to a row are synchronized, but
+   * readers do not take row locks so get and scan operations can see this
+   * operation partially completed.
+   *
+   * @param append object that specifies the columns and amounts to be used
+   *                  for the increment operations
+   * @throws IOException e
+   * @return values of columns after the append operation (maybe null)
+   */
+  public Result append(final Append append) throws IOException;
+
+  /**
+   * Increments one or more columns within a single row.
+   * <p>
+   * This operation does not appear atomic to readers.  Increments are done
+   * under a single row lock, so write operations to a row are synchronized, but
+   * readers do not take row locks so get and scan operations can see this
+   * operation partially completed.
+   *
+   * @param increment object that specifies the columns and amounts to be used
+   *                  for the increment operations
+   * @throws IOException e
+   * @return values of columns after the increment
+   */
+  public Result increment(final Increment increment) throws IOException;
+
+  /**
+   * Atomically increments a column value.
+   * <p>
+   * Equivalent to {@link #incrementColumnValue(byte[], byte[], byte[],
+   * long, boolean) incrementColumnValue}(row, family, qualifier, amount,
+   * <b>true</b>)}
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the
+   * amount is negative).
+   * @return The new value, post increment.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount) throws IOException;
+
+  /**
+   * Atomically increments a column value. If the column value already exists
+   * and is not a big-endian long, this could throw an exception. If the column
+   * value does not yet exist it is initialized to <code>amount</code> and
+   * written to the specified column.
+   *
+   * <p>Setting writeToWAL to false means that in a fail scenario, you will lose
+   * any increments that have not been flushed.
+   * @param row The row that contains the cell to increment.
+   * @param family The column family of the cell to increment.
+   * @param qualifier The column qualifier of the cell to increment.
+   * @param amount The amount to increment the cell with (or decrement, if the
+   * amount is negative).
+   * @param writeToWAL if {@code true}, the operation will be applied to the
+   * Write Ahead Log (WAL).  This makes the operation slower but safer, as if
+   * the call returns successfully, it is guaranteed that the increment will
+   * be safely persisted.  When set to {@code false}, the call may return
+   * successfully before the increment is safely persisted, so it's possible
+   * that the increment be lost in the event of a failure happening before the
+   * operation gets persisted.
+   * @return The new value, post increment.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier,
+      long amount, boolean writeToWAL) throws IOException;
+
+  /**
+   * Tells whether or not 'auto-flush' is turned on.
+   *
+   * @return {@code true} if 'auto-flush' is enabled (default), meaning
+   * {@link Put} operations don't get buffered/delayed and are immediately
+   * executed.
+   */
+  boolean isAutoFlush();
+
+  /**
+   * Executes all the buffered {@link Put} operations.
+   * <p>
+   * This method gets called once automatically for every {@link Put} or batch
+   * of {@link Put}s (when <code>put(List<Put>)</code> is used) when
+   * {@link #isAutoFlush} is {@code true}.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  void flushCommits() throws IOException;
+
+  /**
+   * Releases any resources held or pending changes in internal buffers.
+   *
+   * @throws IOException if a remote or network exception occurs.
+   */
+  void close() throws IOException;
+
+  /**
+   * Creates and returns a {@link com.google.protobuf.RpcChannel} instance connected to the
+   * table region containing the specified row.  The row given does not actually have
+   * to exist.  Whichever region would contain the row based on start and end keys will
+   * be used.  Note that the {@code row} parameter is also not passed to the
+   * coprocessor handler registered for this protocol, unless the {@code row}
+   * is separately passed as an argument in the service request.  The parameter
+   * here is only used to locate the region used to handle the call.
+   *
+   * <p>
+   * The obtained {@link com.google.protobuf.RpcChannel} instance can be used to access a published
+   * coprocessor {@link com.google.protobuf.Service} using standard protobuf service invocations:
+   * </p>
+   *
+   * <div style="background-color: #cccccc; padding: 2px">
+   * <blockquote><pre>
+   * CoprocessorRpcChannel channel = myTable.coprocessorService(rowkey);
+   * MyService.BlockingInterface service = MyService.newBlockingStub(channel);
+   * MyCallRequest request = MyCallRequest.newBuilder()
+   *     ...
+   *     .build();
+   * MyCallResponse response = service.myCall(null, request);
+   * </pre></blockquote></div>
+   *
+   * @param row The row key used to identify the remote region location
+   * @return A CoprocessorRpcChannel instance
+   */
+  CoprocessorRpcChannel coprocessorService(byte[] row);
+
+  /**
+   * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
+   * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
+   * and invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
+   * method with each {@link Service}
+   * instance.
+   *
+   * @param service the protocol buffer {@code Service} implementation to call
+   * @param startKey start region selection with region containing this row.  If {@code null}, the
+   *                 selection will start with the first table region.
+   * @param endKey select regions up to and including the region containing this row.
+   *               If {@code null}, selection will continue through the last table region.
+   * @param callable this instance's
+   *                 {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
+   *                 method will be invoked once per table region, using the {@link Service}
+   *                 instance connected to that region.
+   * @param <T> the {@link Service} subclass to connect to
+   * @param <R> Return type for the {@code callable} parameter's
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+   * @return a map of result values keyed by region name
+   */
+  <T extends Service, R> Map<byte[],R> coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable)
+      throws ServiceException, Throwable;
+
+  /**
+   * Creates an instance of the given {@link com.google.protobuf.Service} subclass for each table
+   * region spanning the range from the {@code startKey} row to {@code endKey} row (inclusive),
+   * and invokes the passed {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call}
+   * method with each {@link Service} instance.
+   *
+   * <p>
+   * The given
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Callback#update(byte[], byte[], Object)}
+   * method will be called with the return value from each region's
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} invocation.
+   *</p>
+   *
+   * @param service the protocol buffer {@code Service} implementation to call
+   * @param startKey start region selection with region containing this row.  If {@code null}, the
+   *                 selection will start with the first table region.
+   * @param endKey select regions up to and including the region containing this row.
+   *               If {@code null}, selection will continue through the last table region.
+   * @param callable this instance's
+   *                 {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+   *                 will be invoked once per table region, using the {@link Service} instance
+   *                 connected to that region.
+   * @param callback
+   * @param <T> the {@link Service} subclass to connect to
+   * @param <R> Return type for the {@code callable} parameter's
+   * {@link org.apache.hadoop.hbase.client.coprocessor.Batch.Call#call} method
+   */
+  <T extends Service, R> void coprocessorService(final Class<T> service,
+      byte[] startKey, byte[] endKey, final Batch.Call<T,R> callable,
+      final Batch.Callback<R> callback) throws ServiceException, Throwable;
+
+  /**
+   * See {@link #setAutoFlush(boolean, boolean)}
+   *
+   * @param autoFlush
+   *        Whether or not to enable 'auto-flush'.
+   */
+  public void setAutoFlush(boolean autoFlush);
+
+  /**
+   * Turns 'auto-flush' on or off.
+   * <p>
+   * When enabled (default), {@link Put} operations don't get buffered/delayed
+   * and are immediately executed. Failed operations are not retried. This is
+   * slower but safer.
+   * <p>
+   * Turning off {@code autoFlush} means that multiple {@link Put}s will be
+   * accepted before any RPC is actually sent to do the write operations. If the
+   * application dies before pending writes get flushed to HBase, data will be
+   * lost.
+   * <p>
+   * When you turn {@code #autoFlush} off, you should also consider the
+   * {@code clearBufferOnFail} option. By default, asynchronous {@link Put}
+   * requests will be retried on failure until successful. However, this can
+   * pollute the writeBuffer and slow down batching performance. Additionally,
+   * you may want to issue a number of Put requests and call
+   * {@link #flushCommits()} as a barrier. In both use cases, consider setting
+   * clearBufferOnFail to true to erase the buffer after {@link #flushCommits()}
+   * has been called, regardless of success.
+   *
+   * @param autoFlush
+   *        Whether or not to enable 'auto-flush'.
+   * @param clearBufferOnFail
+   *        Whether to keep Put failures in the writeBuffer
+   * @see #flushCommits
+   */
+  public void setAutoFlush(boolean autoFlush, boolean clearBufferOnFail);
+
+  /**
+   * Returns the maximum size in bytes of the write buffer for this HTable.
+   * <p>
+   * The default value comes from the configuration parameter
+   * {@code hbase.client.write.buffer}.
+   * @return The size of the write buffer in bytes.
+   */
+  public long getWriteBufferSize();
+
+  /**
+   * Sets the size of the buffer in bytes.
+   * <p>
+   * If the new size is less than the current amount of data in the
+   * write buffer, the buffer gets flushed.
+   * @param writeBufferSize The new write buffer size, in bytes.
+   * @throws IOException if a remote or network exception occurs.
+   */
+  public void setWriteBufferSize(long writeBufferSize) throws IOException;
+}

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterfaceFactory.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterfaceFactory.java?rev=1449950&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterfaceFactory.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableInterfaceFactory.java Mon Feb 25 22:50:17 2013
@@ -0,0 +1,52 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
+
+
+/**
+ * Defines methods to create new HTableInterface.
+ *
+ * @since 0.21.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Stable
+public interface HTableInterfaceFactory {
+
+  /**
+   * Creates a new HTableInterface.
+   *
+   * @param config HBaseConfiguration instance.
+   * @param tableName name of the HBase table.
+   * @return HTableInterface instance.
+   */
+  HTableInterface createHTableInterface(Configuration config, byte[] tableName);
+
+
+  /**
+   * Release the HTable resource represented by the table.
+   * @param table
+   */
+  void releaseHTableInterface(final HTableInterface table) throws IOException;
+}