You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/03/12 22:17:20 UTC

svn commit: r1576909 [5/18] - in /hbase/branches/0.89-fb/src: ./ examples/thrift/ main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/avro/ main/java/org/apache/hadoop/hbase/avro/generated/ main/java/org/apache/hadoop/hbase/client/ mai...

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Wed Mar 12 21:17:13 2014
@@ -19,35 +19,46 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.PreloadThreadPool;
-import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram;
 import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket;
 import org.apache.hadoop.hbase.ipc.HBaseRPCOptions;
 import org.apache.hadoop.hbase.ipc.ProfilingData;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseThriftRPC;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DaemonThreadFactory;
 import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RegionserverUtils;
 import org.apache.hadoop.hbase.util.Writables;
 
 import com.google.common.base.Preconditions;
 
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
 /**
  * Used to communicate with a single HBase table.
  *
@@ -73,8 +84,9 @@ public class HTable implements HTableInt
   private HBaseRPCOptions options;
   private boolean recordClientContext = false;
 
-  @SuppressWarnings("unused")
   private long maxScannerResultSize;
+  private HTableAsync hta;
+  private boolean doAsync;
 
   // Share this multiaction thread pool across all the HTable instance;
   // The total number of threads will be bounded #HTable * #RegionServer.
@@ -87,6 +99,13 @@ public class HTable implements HTableInt
     ((ThreadPoolExecutor)multiActionThreadPool).allowCoreThreadTimeOut(true);
   }
 
+
+  public void initHTableAsync() throws IOException {
+    if (doAsync && hta == null) {
+      hta = new HTableAsync(configuration, tableName);
+    }
+  }
+
   /**
    * Creates an object to access a HBase table. DO NOT USE THIS CONSTRUCTOR.
    * It will make your unit tests fail due to incorrect ZK client port.
@@ -168,8 +187,15 @@ public class HTable implements HTableInt
       this.options.setRxCompression(
           Compression.getCompressionAlgorithmByName(compressionAlgo));
     }
+    // check if we are using swift protocol too
+    this.doAsync = configuration.getBoolean(HConstants.HTABLE_ASYNC_CALLS,
+        HConstants.HTABLE_ASYNC_CALLS_DEFAULT)
+        && configuration.getBoolean(HConstants.CLIENT_TO_RS_USE_THRIFT,
+            HConstants.CLIENT_TO_RS_USE_THRIFT_DEFAULT);
+    HBaseThriftRPC.setUsePooling(conf.getBoolean("hbase.client.useconnectionpooling", true));
   }
 
+  @Override
   public Configuration getConfiguration() {
     return configuration;
   }
@@ -274,6 +300,7 @@ public class HTable implements HTableInt
     return connection.getRegionLocation(tableName, row, reload);
   }
 
+  @Override
   public byte [] getTableName() {
     return this.tableName;
   }
@@ -326,6 +353,7 @@ public class HTable implements HTableInt
     this.connection.clearRegionCache();
   }
 
+  @Override
   public HTableDescriptor getTableDescriptor() throws IOException {
     return new UnmodifyableHTableDescriptor(
       this.connection.getHTableDescriptor(this.tableName));
@@ -365,6 +393,7 @@ public class HTable implements HTableInt
     final List<byte[]> startKeyList = new ArrayList<byte[]>();
     final List<byte[]> endKeyList = new ArrayList<byte[]>();
     MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
       public boolean processRow(Result rowResult) throws IOException {
         HRegionInfo info = Writables.getHRegionInfo(
             rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -383,7 +412,7 @@ public class HTable implements HTableInt
         new byte[startKeyList.size()][]), endKeyList.toArray(
             new byte[endKeyList.size()][]));
   }
-  
+
   /**
    * Gets the starting and ending row keys for every region in the currently
    * open table.
@@ -396,6 +425,7 @@ public class HTable implements HTableInt
     final TreeMap<byte[], byte[]> startEndKeysMap =
       new TreeMap<byte[], byte[]>(new Bytes.ByteArrayComparator());
     MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
       public boolean processRow(Result rowResult) throws IOException {
         HRegionInfo info = Writables.getHRegionInfo(
             rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -413,11 +443,11 @@ public class HTable implements HTableInt
   }
 
   /**
-   * Returns the Array of StartKeys along with the favoredNodes 
-   * for a particular region. Identifying the the favoredNodes using the 
-   * Meta table similar to the 
-   * {@link org.apache.hadoop.hbase.client.HTable.getStartEndKeys()} 
-   * function 
+   * Returns the Array of StartKeys along with the favoredNodes
+   * for a particular region. Identifying the the favoredNodes using the
+   * Meta table similar to the
+   * {@link org.apache.hadoop.hbase.client.HTable.getStartEndKeys()}
+   * function
    * @return
    * @throws IOException
    */
@@ -427,6 +457,7 @@ public class HTable implements HTableInt
     final List<byte[]> favoredNodes =
         new ArrayList<byte[]>();
         MetaScannerVisitor visitor = new MetaScannerVisitor() {
+          @Override
           public boolean processRow(Result rowResult) throws IOException {
             HRegionInfo info = Writables.getHRegionInfo(
                 rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -464,6 +495,7 @@ public class HTable implements HTableInt
       new TreeMap<HRegionInfo, HServerAddress>();
 
     MetaScannerVisitor visitor = new MetaScannerVisitor() {
+      @Override
       public boolean processRow(Result rowResult) throws IOException {
         HRegionInfo info = Writables.getHRegionInfo(
           rowResult.getValue(HConstants.CATALOG_FAMILY,
@@ -593,29 +625,44 @@ public class HTable implements HTableInt
     return allRegions;
   }
 
-   public Result getRowOrBefore(final byte[] row, final byte[] family)
-   throws IOException {
-     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
-         new ServerCallable<Result>(connection, tableName, row, this.options) {
-       public Result call() throws IOException {
-         return server.getClosestRowBefore(location.getRegionInfo().getRegionName(),
-           row, family);
-       }
-     });
-   }
+  @Override
+  public Result getRowOrBefore(final byte[] row, final byte[] family)
+      throws IOException {
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        return hta.getRowOrBeforeAsync(row, family).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
+    return this
+        .getConnectionAndResetOperationContext()
+        .getRegionServerWithRetries(
+            new ServerCallable<Result>(connection, tableName, row, this.options) {
+              @Override
+              public Result call() throws IOException {
+                Result result = server.getClosestRowBefore(location
+                    .getRegionInfo().getRegionName(), row, family);
+                return result;
+              }
+            });
+  }
 
+  @SuppressWarnings("resource")
+  @Override
   public ResultScanner getScanner(final Scan scan) throws IOException {
-    ClientScanner s = new ClientScanner(scan);
-    s.initialize();
-    return s;
+    return new HTableClientScanner(scan, this).initialize();
   }
 
+  @Override
   public ResultScanner getScanner(byte [] family) throws IOException {
     Scan scan = new Scan();
     scan.addFamily(family);
     return getScanner(scan);
   }
 
+  @Override
   public ResultScanner getScanner(byte [] family, byte [] qualifier)
   throws IOException {
     Scan scan = new Scan();
@@ -669,9 +716,19 @@ public class HTable implements HTableInt
     return this.getConnectionAndResetOperationContext().getServerConfProperty(name);
   }
 
+  @Override
   public Result get(final Get get) throws IOException {
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        return hta.getAsync(get).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+    }
     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
         new ServerCallable<Result>(connection, tableName, get.getRow(), this.options) {
+          @Override
           public Result call() throws IOException {
             return server.get(location.getRegionInfo().getRegionName(), get);
           }
@@ -679,10 +736,11 @@ public class HTable implements HTableInt
     );
   }
 
+  @Override
   public Result[] get(List<Get> gets) throws IOException {
     return this.getConnectionAndResetOperationContext().processBatchOfGets(gets, tableName, this.options);
   }
-  
+
   /**
    * Get collected profiling data and clears it from the HTable
    * @return aggregated profiling data
@@ -704,6 +762,7 @@ public class HTable implements HTableInt
       this.getConnectionAndResetOperationContext().processBatchedGets(actions, tableName, multiActionThreadPool,
           results, this.options);
     } catch (Exception e) {
+      e.printStackTrace();
       throw new IOException(e);
     }
     return results;
@@ -724,36 +783,58 @@ public class HTable implements HTableInt
   }
 
   /**
-   * {@inheritDoc}
+   * @param delete
+   * @throws IOException
    */
   @Override
-  public void delete(final Delete delete)
-  throws IOException {
+  public void delete(final Delete delete) throws IOException {
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        hta.deleteAsync(delete).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+    } else {
       this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
-        new ServerCallable<Boolean>(connection,
-            tableName, delete.getRow(), this.options) {
-          public Boolean call() throws IOException {
-            server.delete(location.getRegionInfo().getRegionName(), delete);
-            return null; // FindBugs NP_BOOLEAN_RETURN_NULL
-          }
-        }
-    );
+          new ServerCallable<Boolean>(connection, tableName, delete.getRow(),
+              this.options) {
+            @Override
+            public Boolean call() throws IOException {
+              server.delete(location.getRegionInfo().getRegionName(), delete);
+              return null; // FindBugs NP_BOOLEAN_RETURN_NULL
+            }
+          });
+    }
   }
 
+  @Override
   public void delete(final List<Delete> deletes)
   throws IOException {
     int last = 0;
     try {
-      last = this.getConnectionAndResetOperationContext().processBatchOfDeletes(deletes, this.tableName, this.options);
+      last = this.getConnectionAndResetOperationContext().processBatchOfDeletes(
+          deletes, this.tableName, this.options);
     } finally {
       deletes.subList(0, last).clear();
     }
   }
 
+  @Override
   public void put(final Put put) throws IOException {
-    doPut(Arrays.asList(put));
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        hta.putAsync(put).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+    } else {
+      doPut(Arrays.asList(put));
+    }
   }
 
+  @Override
   public void put(final List<Put> puts) throws IOException {
     doPut(puts);
   }
@@ -769,13 +850,14 @@ public class HTable implements HTableInt
     }
   }
 
+  @Override
   public long incrementColumnValue(final byte [] row, final byte [] family,
       final byte [] qualifier, final long amount)
   throws IOException {
     return incrementColumnValue(row, family, qualifier, amount, true);
   }
 
-  @SuppressWarnings({"ThrowableInstanceNeverThrown"})
+  @Override
   public long incrementColumnValue(final byte [] row, final byte [] family,
       final byte [] qualifier, final long amount, final boolean writeToWAL)
   throws IOException {
@@ -791,6 +873,7 @@ public class HTable implements HTableInt
     }
     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
         new ServerCallable<Long>(connection, tableName, row, this.options) {
+          @Override
           public Long call() throws IOException {
             return server.incrementColumnValue(
                 location.getRegionInfo().getRegionName(), row, family,
@@ -813,12 +896,14 @@ public class HTable implements HTableInt
    * @throws IOException
    * @return true if the new put was execute, false otherwise
    */
+  @Override
   public boolean checkAndPut(final byte [] row,
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Put put)
   throws IOException {
     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
         new ServerCallable<Boolean>(connection, tableName, row, this.options) {
+          @Override
           public Boolean call() throws IOException {
             return server.checkAndPut(location.getRegionInfo().getRegionName(),
                 row, family, qualifier, value, put) ? Boolean.TRUE : Boolean.FALSE;
@@ -840,12 +925,14 @@ public class HTable implements HTableInt
    * @throws IOException
    * @return true if the new delete was executed, false otherwise
    */
+  @Override
   public boolean checkAndDelete(final byte [] row,
       final byte [] family, final byte [] qualifier, final byte [] value,
       final Delete delete)
   throws IOException {
     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
         new ServerCallable<Boolean>(connection, tableName, row, this.options) {
+          @Override
           public Boolean call() throws IOException {
             return server.checkAndDelete(
                 location.getRegionInfo().getRegionName(),
@@ -861,13 +948,25 @@ public class HTable implements HTableInt
    */
   @Override
   public void mutateRow(final RowMutations arm) throws IOException {
-    this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
-      new ServerCallable<Void>(this.connection, tableName, arm.getRow(), this.options) {
-        public Void call() throws IOException {
-          server.mutateRow(location.getRegionInfo().getRegionName(), arm);
-          return null;
-        }
-      });
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        hta.mutateRowAsync(arm).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
+      }
+    } else {
+      this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
+          new ServerCallable<Void>(this.connection, tableName, arm.getRow(),
+              this.options) {
+            @Override
+            public Void call() throws IOException, InterruptedException,
+                ExecutionException {
+              server.mutateRow(location.getRegionInfo().getRegionName(), arm);
+              return null;
+            }
+          });
+    }
   }
 
   /**
@@ -875,7 +974,8 @@ public class HTable implements HTableInt
    */
   @Override
   public void mutateRow(final List<RowMutations> armList) throws IOException {
-    this.getConnectionAndResetOperationContext().processBatchOfRowMutations(armList, this.tableName, this.options);
+    this.getConnectionAndResetOperationContext().processBatchOfRowMutations(
+        armList, this.tableName, this.options);
   }
 
   /**
@@ -889,9 +989,11 @@ public class HTable implements HTableInt
    * @return true if the specified Get matches one or more keys, false if not
    * @throws IOException
    */
+  @Override
   public boolean exists(final Get get) throws IOException {
     return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
         new ServerCallable<Boolean>(connection, tableName, get.getRow(), this.options) {
+          @Override
           public Boolean call() throws IOException {
             return server.
                 exists(location.getRegionInfo().getRegionName(), get);
@@ -900,9 +1002,11 @@ public class HTable implements HTableInt
     );
   }
 
+  @Override
   public void flushCommits() throws IOException {
     try {
-      this.getConnectionAndResetOperationContext().processBatchOfPuts(writeBuffer, tableName, this.options);
+      this.getConnectionAndResetOperationContext().
+        processBatchOfPuts(writeBuffer, tableName, this.options);
     } finally {
       if (clearBufferOnFail) {
         writeBuffer.clear();
@@ -941,32 +1045,53 @@ public class HTable implements HTableInt
     }
   }
 
-  public RowLock lockRow(final byte [] row)
-  throws IOException {
-    return this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
-      new ServerCallable<RowLock>(connection, tableName, row, this.options) {
-        public RowLock call() throws IOException {
-          long lockId =
-              server.lockRow(location.getRegionInfo().getRegionName(), row);
-          return new RowLock(row,lockId);
-        }
+  @Override
+  public RowLock lockRow(final byte[] row) throws IOException {
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        return hta.lockRowAsync(row).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
       }
-    );
+    }
+    return this.getConnectionAndResetOperationContext()
+        .getRegionServerWithRetries(
+            new ServerCallable<RowLock>(connection, tableName, row,
+                this.options) {
+              @Override
+              public RowLock call() throws IOException {
+                long lockId = server.lockRow(location.getRegionInfo()
+                    .getRegionName(), row);
+                return new RowLock(row, lockId);
+              }
+            });
   }
 
-  public void unlockRow(final RowLock rl)
-  throws IOException {
-    this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
-      new ServerCallable<Boolean>(connection, tableName, rl.getRow(), this.options) {
-        public Boolean call() throws IOException {
-          server.unlockRow(location.getRegionInfo().getRegionName(),
-              rl.getLockId());
-          return null; // FindBugs NP_BOOLEAN_RETURN_NULL
-        }
+  @Override
+  public void unlockRow(final RowLock rl) throws IOException {
+    initHTableAsync();
+    if (hta != null) {
+      try {
+        hta.unlockRowAsync(rl).get();
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
       }
-    );
+    } else {
+      this.getConnectionAndResetOperationContext().getRegionServerWithRetries(
+          new ServerCallable<Boolean>(connection, tableName, rl.getRow(),
+              this.options) {
+            @Override
+            public Boolean call() throws IOException {
+              server.unlockRow(location.getRegionInfo().getRegionName(),
+                  rl.getLockId());
+              return null; // FindBugs NP_BOOLEAN_RETURN_NULL
+            }
+          });
+    }
   }
 
+  @Override
   public boolean isAutoFlush() {
     return autoFlush;
   }
@@ -1078,157 +1203,6 @@ public class HTable implements HTableInt
   }
 
   /**
-   * Implements the scanner interface for the HBase client.
-   * If there are multiple regions in a table, this scanner will iterate
-   * through them all.
-   */
-  protected class ClientScanner extends ResultScannerImpl {
-    // HEADSUP: The scan internal start row can change as we move through table.
-    // Current region scanner is against.  Gets cleared if current region goes
-    // wonky: e.g. if it splits on us.
-    private ScannerCallable callable = null;
-    // Keep lastResult returned successfully in case we have to reset scanner.
-    private Result lastResult = null;
-
-    protected ClientScanner(final Scan scan) {
-      super(scan, HTable.this);
-      // Removed filter validation.  We have a new format now, only one of all
-      // the current filters has a validate() method.  We can add it back,
-      // need to decide on what we're going to do re: filter redesign.
-      // Need, at the least, to break up family from qualifier as separate
-      // checks, I think it's important server-side filters are optimal in that
-      // respect.
-    }
-
-    protected Scan getScan() {
-      return scan;
-    }
-
-    protected ScannerCallable getScannerCallable(byte [] localStartKey,
-        int nbRows, HBaseRPCOptions options) {
-      scan.setStartRow(localStartKey);
-      ScannerCallable s = new ScannerCallable(
-          getConnectionAndResetOperationContext(), getTableName(), scan,
-          options);
-      s.setCaching(nbRows);
-      return s;
-    }
-
-    @Override
-    protected void cleanUpPreviousScanners() throws IOException {
-      // Close the previous scanner if it's open
-      if (this.callable != null) {
-        this.callable.setClose();
-        getConnectionAndResetOperationContext().getRegionServerWithRetries(
-            callable);
-        this.callable = null;
-      }
-
-    }
-
-    @Override
-    protected boolean doRealOpenScanners(byte[] localStartKey, int nbRows)
-        throws IOException {
-      try {
-        callable = getScannerCallable(localStartKey, nbRows, options);
-        // Open a scanner on the region server starting at the
-        // beginning of the region
-        getConnectionAndResetOperationContext().getRegionServerWithRetries(
-            callable);
-        this.currentRegion = callable.getHRegionInfo();
-      } catch (IOException e) {
-        close();
-        throw e;
-      }
-      return true;
-    }
-
-    @Override
-    protected void cacheNextResults() throws IOException {
-      Result [] values = null;
-      // We need to reset it if it's a new callable that was created
-      // with a countdown in nextScanner
-      callable.setCaching(this.caching);
-      // This flag is set when we want to skip the result returned.  We do
-      // this when we reset scanner because it split under us.
-      boolean skipFirst = false;
-      boolean foundResults = false;
-      do {
-        try {
-          // Server returns a null values if scanning is to stop.  Else,
-          // returns an empty array if scanning is to go on and we've just
-          // exhausted current region.
-          values = getConnectionAndResetOperationContext(
-              ).getRegionServerWithRetries(callable);
-          if (skipFirst) {
-            skipFirst = false;
-            // Reget.
-            values = getConnectionAndResetOperationContext()
-                .getRegionServerWithRetries(callable);
-          }
-        } catch (DoNotRetryIOException e) {
-          if (e instanceof UnknownScannerException) {
-            long timeout = this.lastNextCallTimeStamp + scannerTimeout;
-            // If we are over the timeout, throw this exception to the client
-            // Else, it's because the region moved and we used the old id
-            // against the new region server; reset the scanner.
-            if (timeout < System.currentTimeMillis()) {
-              long elapsed = System.currentTimeMillis()
-                  - this.lastNextCallTimeStamp;
-              ScannerTimeoutException ex = new ScannerTimeoutException(
-                  elapsed + "ms passed since the last invocation, " +
-                  "timeout is currently set to " + scannerTimeout);
-              ex.initCause(e);
-              throw ex;
-            }
-          } else {
-            Throwable cause = e.getCause();
-            if (cause == null
-                || !(cause instanceof NotServingRegionException)) {
-              throw e;
-            }
-          }
-          // Else, its signal from depths of ScannerCallable that we got an
-          // NSRE on a next and that we need to reset the scanner.
-          if (this.lastResult != null) {
-            this.scan.setStartRow(this.lastResult.getRow());
-            // Skip first row returned.  We already let it out on previous
-            // invocation.
-            skipFirst = true;
-          }
-          // Clear region
-          this.currentRegion = null;
-        }
-        this.lastNextCallTimeStamp = System.currentTimeMillis();
-        if (values != null && values.length > 0) {
-          foundResults = true;
-          for (Result rs : values) {
-            cache.add(rs);
-            this.lastResult = rs;
-          }
-        }
-      } while (!foundResults && nextScanner(this.caching, values == null));
-    }
-
-    @Override
-    protected void closeCurrentScanner() {
-      if (callable != null) {
-        callable.setClose();
-        try {
-          getConnectionAndResetOperationContext().getRegionServerWithRetries(
-              callable);
-        } catch (IOException e) {
-          // We used to catch this error, interpret, and rethrow. However, we
-          // have since decided that it's not nice for a scanner's close to
-          // throw exceptions. Chances are it was just an UnknownScanner
-          // exception due to lease time out.
-        }
-        callable = null;
-      }
-    }
-  }
-
-  /**
    * Enable or disable region cache prefetch for the table. It will be
    * applied for the given table's all HTable instances who share the same
    * connection. By default, the cache prefetch is enabled.
@@ -1280,14 +1254,14 @@ public class HTable implements HTableInt
     return HConnectionManager.getConnection(HBaseConfiguration.create()).
     getRegionCachePrefetch(tableName);
   }
-  
+
   /**
    * Set profiling request on/off for every subsequent RPC calls
    * @param prof profiling true/false
    */
   @Override
   public void setProfiling(boolean prof) {
-    options.setRequestProfiling (prof);
+    options.setRequestProfiling(prof);
     options.profilingResult = null;
   }
 
@@ -1305,7 +1279,7 @@ public class HTable implements HTableInt
   public String getTag () {
     return this.options.getTag ();
   }
-  
+
   /**
    * set compression used to send RPC calls to the server
    * @param alg compression algorithm
@@ -1313,11 +1287,11 @@ public class HTable implements HTableInt
   public void setTxCompression(Compression.Algorithm alg) {
     this.options.setTxCompression(alg);
   }
-  
+
   public Compression.Algorithm getTxCompression() {
     return this.options.getTxCompression();
   }
-  
+
   /**
    * set compression used to receive RPC responses from the server
    * @param alg compression algorithm
@@ -1325,7 +1299,7 @@ public class HTable implements HTableInt
   public void setRxCompression(Compression.Algorithm alg) {
     this.options.setRxCompression(alg);
   }
-  
+
   public Compression.Algorithm getRxCompression() {
     return this.options.getRxCompression();
   }
@@ -1376,6 +1350,7 @@ public class HTable implements HTableInt
         .getRegionServerWithRetries(
             new ServerCallable<List<Bucket>>(connection,
                 tableName, row, this.options) {
+              @Override
               public List<Bucket> call() throws IOException {
                 if (cf != null) {
                   return server.getHistogramForStore(
@@ -1415,6 +1390,7 @@ public class HTable implements HTableInt
     for (final byte[] row : this.getStartKeys()) {
       futures.put(row, HTable.multiActionThreadPool.submit(
           new Callable<List<Bucket>>() {
+            @Override
             public List<Bucket> call() throws Exception {
               if (family == null) {
                 return getHistogram(row);
@@ -1437,4 +1413,16 @@ public class HTable implements HTableInt
     }
     return ret;
   }
+
+  public long getMaxScannerResultSize() {
+    return maxScannerResultSize;
+  }
+
+  public HConnection getConnection() {
+    return connection;
+  }
+
+  public HBaseRPCOptions getOptions() {
+    return options;
+  }
 }

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsync.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,248 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.HConnectionParams;
+import org.apache.hadoop.hbase.ipc.thrift.HBaseToThriftAdapter;
+import org.apache.hadoop.hbase.thrift.SelfRetryingListenableFuture;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.DaemonThreadFactory;
+
+/**
+ * Used to communicate with a single HBase table.
+ * Provide additional asynchronous APIs as complement of HTableInterface.
+ */
+public class HTableAsync extends HTable implements HTableAsyncInterface {
+
+  //TODO: decide what is a good number of core threads. Max thread number seems unconfigurable.
+  private final ListeningScheduledExecutorService executorService =
+      MoreExecutors.listeningDecorator(Executors.newScheduledThreadPool(
+          HConstants.DEFAULT_HTABLE_ASYNC_CORE_THREADS, new DaemonThreadFactory("htable-async-thread-")));
+
+  private HConnectionParams hConnectionParams;
+
+  /**
+   * Creates an object to access a HBase table through asynchronous APIs.
+   *
+   * @param conf Configuration object to use.
+   * @param tableName Name of the table.
+   * @throws java.io.IOException if a remote or network exception occurs
+   */
+  public HTableAsync(Configuration conf, String tableName)
+      throws IOException {
+    this(conf, Bytes.toBytes(tableName));
+
+    this.hConnectionParams = HConnectionParams.getInstance(conf);
+  }
+
+  /**
+   * Creates an object to access a HBase table through asynchronous APIs.
+   *
+   * @param conf Configuration object to use.
+   * @param tableName Name of the table.
+   * @throws IOException if a remote or network exception occurs
+   */
+  public HTableAsync(Configuration conf, byte[] tableName)
+      throws IOException {
+    super(conf, tableName);
+
+    this.hConnectionParams = HConnectionParams.getInstance(conf);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Result> getAsync(final Get get) {
+    ServerCallable<ListenableFuture<Result>> callable = new ServerCallable<ListenableFuture<Result>>(
+        getConnection(), getTableName(), get.getRow(), getOptions()) {
+      @Override
+      public ListenableFuture<Result> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).getAsync(location.getRegionInfo().getRegionName(), get);
+      }
+    };
+
+    SelfRetryingListenableFuture<Result> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Result[]> batchGetAsync(final List<Get> list) {
+    return executorService.submit(new Callable<Result[]>() {
+      public Result[] call() throws IOException {
+        return batchGet(list);
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> putAsync(final Put put) {
+    // Since put has a buffer on client side, use mutateRowAsync instead
+    try {
+      RowMutations arm = new RowMutations.Builder(put.getRow()).add(put).create();
+      return mutateRowAsync(arm);
+    } catch (IOException e) {
+      return Futures.immediateFailedFuture(e);
+    }
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Result> getRowOrBeforeAsync(final byte[] row, final byte[] family) {
+    ServerCallable<ListenableFuture<Result>> callable = new ServerCallable<ListenableFuture<Result>>(
+        getConnection(), getTableName(), row, getOptions()) {
+      @Override
+      public ListenableFuture<Result> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).getClosestRowBeforeAsync(
+            location.getRegionInfo().getRegionName(), row, family);
+      }
+    };
+
+    SelfRetryingListenableFuture<Result> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> deleteAsync(final Delete delete) {
+    ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+        getConnection(), getTableName(), delete.getRow(), getOptions()) {
+      @Override
+      public ListenableFuture<Void> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).deleteAsync(location.getRegionInfo().getRegionName(), delete);
+      }
+    };
+
+    SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> mutateRowAsync(final RowMutations arm) {
+    ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+        getConnection(), getTableName(), arm.getRow(), getOptions()) {
+      @Override
+      public ListenableFuture<Void> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).mutateRowAsync(location.getRegionInfo().getRegionName(), arm);
+      }
+    };
+
+    SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> batchMutateAsync(final List<Mutation> mutations) {
+    return executorService.submit(new Callable<Void>() {
+      public Void call() throws IOException {
+        batchMutate(mutations);
+        return null;
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> flushCommitsAsync() {
+    return executorService.submit(new Callable<Void>() {
+      public Void call() throws IOException {
+        flushCommits();
+        return null;
+      }
+    });
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<RowLock> lockRowAsync(final byte[] row) {
+    ServerCallable<ListenableFuture<RowLock>> callable = new ServerCallable<ListenableFuture<RowLock>>(
+        getConnection(), getTableName(), row, getOptions()) {
+      @Override
+      public ListenableFuture<RowLock> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).lockRowAsync(location.getRegionInfo().getRegionName(), row);
+      }
+    };
+
+    SelfRetryingListenableFuture<RowLock> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public ListenableFuture<Void> unlockRowAsync(final RowLock rl) {
+    ServerCallable<ListenableFuture<Void>> callable = new ServerCallable<ListenableFuture<Void>>(
+        getConnection(), getTableName(), rl.getRow(), getOptions()) {
+      @Override
+      public ListenableFuture<Void> call() throws Exception {
+        return ((HBaseToThriftAdapter)server).unlockRowAsync(
+            location.getRegionInfo().getRegionName(), rl.getLockId());
+      }
+    };
+
+    SelfRetryingListenableFuture<Void> future = new SelfRetryingListenableFuture<>(this, callable,
+        hConnectionParams, executorService);
+
+    return future.startFuture();
+  }
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableAsyncInterface.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,135 @@
+/**
+ * Copyright 2013 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.List;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+/**
+ * Used to communicate with a single HBase table.
+ * Provide additional asynchronous APIs as complement of HTableInterface.
+ * When APIs are called, it returns a ListenableFuture object immediately,
+ * and executes the request in the background. Call blocking method get()
+ * on the ListenableFuture object to get the result.
+ *
+ * We don't provide asynchronous API that does not directly contact region
+ * server, for example, put().
+ */
+public interface HTableAsyncInterface extends HTableInterface {
+
+  /**
+   * Extracts certain cells from a given row.
+   *
+   * @param get The object that specifies what data to fetch and from which row.
+   * @return Listenable future of data coming from the specified row, if it exists.
+   * If the row specified doesn't exist, the {@link Result} instance returned won't
+   * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+   */
+  ListenableFuture<Result> getAsync(Get get);
+
+  /**
+   * Extracts certain cells from the given rows, in batch.
+   *
+   * @param list The objects that specify what data to fetch and from which rows.
+   * @return Listenable future of data coming from the specified rows, if it exists.
+   * If the row specified doesn't exist, the {@link Result} instance returned won't
+   * contain any {@link KeyValue}, as indicated by {@link Result#isEmpty()}.
+   * If there are any failures even after retries, there will be a null in
+   * the results array for those Gets, AND an exception will be thrown.
+   */
+  ListenableFuture<Result[]> batchGetAsync(final List<Get> list);
+
+  /**
+   * Return the row that matches <i>row</i> exactly, or the one that immediately precedes it.
+   *
+   * @param row A row key.
+   * @param family Column family to include in the {@link Result}.
+   * @return Listenable future of the row that matches <i>row</i> exactly, or
+   * the one that immediately precedes it.
+   */
+  ListenableFuture<Result> getRowOrBeforeAsync(byte[] row, byte[] family);
+
+  /**
+   * Puts some data in the table.
+   * <p>
+   * If {@link #isAutoFlush isAutoFlush} is false, the update is buffered
+   * until the internal buffer is full.
+   *
+   * @param put The data to put.
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> putAsync(Put put);
+
+  /**
+   * Deletes the specified cells/row.
+   *
+   * @param delete The object that specifies what to delete.
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> deleteAsync(Delete delete);
+
+  /**
+   * Performs multiple mutations atomically on a single row. Currently
+   * {@link Put} and {@link Delete} are supported.
+   *
+   * @param arm object that specifies the set of mutations to perform
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> mutateRowAsync(RowMutations arm);
+
+  /**
+   * Process batch of mutations on a row. Currently
+   * {@link Put} and {@link Delete} are supported.
+   *
+   * @param mutations objects that specify the set of mutations to perform.
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> batchMutateAsync(List<Mutation> mutations);
+
+  /**
+   * Executes all the buffered {@link Put} operations.
+   * <p>
+   * This method gets called once automatically for every {@link Put} or batch
+   * of {@link Put}s (when <code>put(List<Put>)</code> is used) when
+   * {@link #isAutoFlush} is {@code true}.
+   *
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> flushCommitsAsync();
+
+  /**
+   * Obtains a lock on a row.
+   *
+   * @param row The row to lock.
+   * @return Listenable future of a {@link RowLock} containing the row and lock id.
+   */
+  ListenableFuture<RowLock> lockRowAsync(byte[] row);
+
+  /**
+   * Releases a row lock.
+   *
+   * @param rl The row lock to release.
+   * @return Listenable future.
+   */
+  ListenableFuture<Void> unlockRowAsync(RowLock rl);
+}

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableClientScanner.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,359 @@
+/**
+ * Copyright 2014 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.UnknownScannerException;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Implements the scanner interface for the HBase client.
+ * If there are multiple regions in a table, this scanner will iterate
+ * through them all.
+ */
+public class HTableClientScanner implements ResultScanner, Runnable {
+  private static final Log LOG = LogFactory.getLog(HTableClientScanner.class);
+  // End of Scanning
+  private static final Result[] EOS = new Result[0];
+
+  private static final int MAX_THREADS_IN_POOL = Runtime.getRuntime()
+      .availableProcessors();
+
+  private static final ExecutorService executor = new ThreadPoolExecutor(1,
+      MAX_THREADS_IN_POOL, 60L, TimeUnit.SECONDS,
+      new SynchronousQueue<Runnable>());
+
+  // HEADSUP: The scan internal start row can change as we move through table.
+  protected final Scan scan;
+  // The number of prefetched and cached results
+  private final int caching;
+  // Temporary results list in main thread, may be null
+  private Result[] currentResults;
+  // The position of next unfetched results in currentResults if it is
+  // non-null.
+  private int currentPos;
+  // Whether this client has closed.
+  private boolean closed;
+  /**
+   * The queue transferring fetched Result[] to main thread.
+   * When queue.take() returns an EOS, scanning ends.
+   */
+  private final ArrayBlockingQueue<Result[]> queue;
+  // The variable informing fetching thread to stop
+  private volatile boolean closing;
+  // Contains the exception caught in fetch thread.
+  private volatile Throwable exception;
+
+  private final HTable table;
+
+  /**
+   * Constructor.
+   */
+  public HTableClientScanner(Scan scan, HTable table) {
+    this.scan = scan;
+    this.table = table;
+    this.queue = new ArrayBlockingQueue<>(table.getConfiguration().getInt(
+        HConstants.HBASE_CLIENT_SCANNER_QUEUE_LENGTH,
+        HConstants.DEFAULT_HBASE_CLIENT_SCANNER_QUEUE_LENGTH));
+
+    if (scan.getCaching() > 0) {
+      this.caching = scan.getCaching();
+    } else {
+      this.caching = table.getScannerCaching();
+    }
+  }
+
+  HTableClientScanner initialize() {
+    executor.execute(this);
+    return this;
+  }
+
+  @Override
+  public Iterator<Result> iterator() {
+    return new ResultScannerIterator(this);
+  }
+
+  // Throws a Throwable exception as IOException of RuntimeException
+  private void throwIOException(Throwable e) throws IOException {
+    if (e != null) {
+      if (e instanceof IOException) {
+        throw (IOException) e;
+      } else if (e instanceof RuntimeException) {
+        throw (RuntimeException) e;
+      }
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Fetches results from queue to currentResults if it is not null.
+   *
+   * @return true if more results available, false if end of scanning
+   */
+  private boolean fetchFromQueue() throws IOException {
+    if (currentResults != null) {
+      return true;
+    }
+
+    if (closed) {
+      return false;
+    }
+
+    try {
+      currentResults = queue.take();
+      if (currentResults.length == 0) {
+        // End of scanning
+        closed = true;
+        currentResults = null;
+
+        if (exception != null) {
+
+          // Failure of scanning
+          throwIOException(exception);
+        }
+
+        return false;
+      }
+
+      // Results fetched
+      currentPos = 0;
+      return true;
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Result next() throws IOException {
+    if (!fetchFromQueue()) {
+      return null;
+    }
+    Result res = currentResults[currentPos];
+    currentPos++;
+
+    if (currentPos >= currentResults.length) {
+      currentResults = null;
+    }
+    return res;
+  }
+
+  @Override
+  public Result[] next(int nbRows) throws IOException {
+    if (!fetchFromQueue()) {
+      return null;
+    }
+
+    // In case, currentResults is just the results we want, return it directly
+    // to avoid extra resource allocation and copying.
+    if (currentPos == 0 && nbRows == currentResults.length) {
+      Result[] res = currentResults;
+      currentResults = null;
+      return res;
+    }
+
+    Result[] res = new Result[nbRows];
+    int len = 0;
+
+    while (len < nbRows) {
+      // Move from currentResults
+      int n = Math.min(nbRows - len, currentResults.length - currentPos);
+      System.arraycopy(currentResults, currentPos, res, len, n);
+
+      len += n;
+      currentPos += n;
+
+      if (currentPos == currentResults.length) {
+        currentResults = null;
+
+        if (!fetchFromQueue()) {
+          // Unexpected partial results, we have to make a copy.
+          return Arrays.copyOf(res, len);
+        }
+      }
+    }
+
+    return res;
+  }
+
+  @Override
+  public void close() {
+    if (this.closed) {
+      return;
+    }
+    this.closing = true;
+    try {
+      while (fetchFromQueue()) {
+        // skip all results
+        currentResults = null;
+      }
+    } catch (Throwable e) {
+      LOG.debug("Exception on closing", e);
+      this.closed = true;
+    }
+  }
+
+  private Result[] call(ScannerCallable callable) throws IOException {
+    return table.getConnectionAndResetOperationContext()
+        .getRegionServerWithRetries(callable);
+  }
+
+  // Returns a ScannerCallable with a start key
+  private ScannerCallable getScannerCallable(byte[] startKey) {
+    scan.setStartRow(startKey);
+    ScannerCallable s = new ScannerCallable(
+        table.getConnectionAndResetOperationContext(), table.getTableName(),
+        scan, table.getOptions());
+    s.setCaching(caching);
+    return s;
+  }
+
+  // Closes a callable silently.
+  private void closeScanner(ScannerCallable callable) {
+    callable.setClose();
+    try {
+      call(callable);
+    } catch (IOException e) {
+      // We used to catch this error, interpret, and rethrow. However, we
+      // have since decided that it's not nice for a scanner's close to
+      // throw exceptions. Chances are it was just an UnknownScanner
+      // exception due to lease time out.
+      LOG.error("Exception caught during closeScanner", e);
+    }
+  }
+
+  /**
+   * Scans a region server, results are put to queue.
+   *
+   * @return New start key if scanning does not end, null otherwise
+   * @throws IOException
+   * @throws InterruptedException
+   */
+  private byte[] scanRegionServer(byte[] startKey) throws IOException,
+      InterruptedException {
+    // Open a scanner
+    ScannerCallable callable = getScannerCallable(startKey);
+    // openScanner
+    call(callable);
+    HRegionInfo currentRegion = callable.getHRegionInfo();
+
+    Result lastRes = null;
+    long lastSuccNextTs = System.currentTimeMillis();
+    try {
+      while (!closing) {
+        Result[] values = call(callable);
+        if (values == null) {
+          // End of scanning
+          return null;
+        } else if (values.length == 0) {
+          // End of region
+          return currentRegion.getEndKey();
+        }
+
+        lastRes = values[values.length - 1];
+        if (!closing) {
+          queue.put(values);
+        }
+        lastSuccNextTs = System.currentTimeMillis();
+      }
+    } catch (DoNotRetryIOException e) {
+      boolean canRetry = false;
+      if (e instanceof UnknownScannerException) {
+        long timeoutTs = lastSuccNextTs + table.scannerTimeout;
+        long now = System.currentTimeMillis();
+        if (now > timeoutTs) {
+          // Scanner timeout
+          long elapsed = now - lastSuccNextTs;
+          ScannerTimeoutException ex = new ScannerTimeoutException(elapsed
+              + "ms pased since the last invocation, "
+              + "timetout is current set to " + table.scannerTimeout);
+          ex.initCause(e);
+          throw ex;
+        }
+
+        canRetry = true; // scannerTimeout
+      } else {
+        Throwable cause = e.getCause();
+        if (cause != null && cause instanceof NotServingRegionException) {
+          canRetry = true;
+        }
+      }
+
+      if (!canRetry) {
+        // Cannot retry, simply throw it out
+        throw e;
+      }
+
+      if (lastRes != null) {
+        return Bytes.nextOf(lastRes.getRow());
+      }
+
+      return startKey;
+    } finally {
+      closeScanner(callable);
+    }
+    // Only reach here when closing is true
+    return null;
+  }
+
+  @Override
+  public void run() {
+    try {
+      byte[] startKey = this.scan.getStartRow();
+      while (!closing) {
+        startKey = scanRegionServer(startKey);
+        if (startKey == null || startKey.length == 0) {
+          break;
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      exception = e;
+    } catch (Throwable e) {
+      exception = e;
+    }
+
+    try {
+      queue.put(EOS);
+    } catch (InterruptedException e) {
+      LOG.info("Fetching thread interrupted", e);
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  @Override
+  public boolean isClosed() {
+    return closed;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTableInterface.java Wed Mar 12 21:17:13 2014
@@ -278,7 +278,7 @@ public interface HTableInterface {
   public void mutateRow(final RowMutations arm) throws IOException;
 
   public void mutateRow(List<RowMutations> armList) throws IOException;
-  
+
   /**
    * Tells whether or not 'auto-flush' is turned on.
    *

Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java?rev=1576909&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java (added)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/IntegerOrResultOrException.java Wed Mar 12 21:17:13 2014
@@ -0,0 +1,190 @@
+/*
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+
+/**
+ * This is used in the containers in {@link TMultiResponse}. Previously we used
+ * Object as value, but since we are moving to Thrift serialization, we need to
+ * serialize the containers. The Type specifies which value this wrapper is
+ * holding: Integer, List<Result> or MultiResponseException. Previously Object
+ * could represent either Integer, Exception or Result[].
+ *
+ */
+@ThriftStruct
+public class IntegerOrResultOrException {
+
+  public enum Type {
+    INTEGER, LIST_OF_RESULTS, EXCEPTION
+  }
+
+  private Integer integer;
+  private ThriftHBaseException ex = null;
+  private List<Result> results = null;
+  private Type type = null;
+
+  public IntegerOrResultOrException() {}
+
+  @ThriftConstructor
+  public IntegerOrResultOrException(@ThriftField(1) Integer integer,
+      @ThriftField(2) ThriftHBaseException ex,
+      @ThriftField(3) List<Result> results,
+      @ThriftField(4) Type type) {
+    this.type = type;
+    if (type == Type.INTEGER) {
+      this.integer = integer;
+    } else if (type == Type.LIST_OF_RESULTS) {
+      this.results = results;
+    } else if (type == Type.EXCEPTION) {
+      this.ex = ex;
+    }
+  }
+
+  public IntegerOrResultOrException(Object obj) {
+    if (obj instanceof Integer) {
+      this.integer = (Integer) obj;
+      this.type = Type.INTEGER;
+    } else if (obj instanceof Exception) {
+      this.ex = new ThriftHBaseException((Exception) obj);
+      this.type = Type.EXCEPTION;
+    } else if (obj instanceof Result[]) {
+      Result[] resultArray = (Result[]) obj;
+      this.results = new ArrayList<>();
+      Collections.addAll(results, resultArray);
+      this.type = Type.LIST_OF_RESULTS;
+    } else {
+      throw new IllegalArgumentException(obj.getClass().getCanonicalName()
+          + " is not a supported type");
+    }
+  }
+
+  public IntegerOrResultOrException(Integer integer) {
+    this.integer = integer;
+    this.type = Type.INTEGER;
+  }
+
+  public IntegerOrResultOrException(ThriftHBaseException ex) {
+    this.ex = ex;
+    this.type = Type.EXCEPTION;
+  }
+
+  public IntegerOrResultOrException(List<Result> results) {
+    this.results = results;
+    this.type = Type.LIST_OF_RESULTS;
+  }
+
+  @ThriftField(1)
+  public Integer getInteger() {
+    return integer;
+  }
+
+  @ThriftField(2)
+  public ThriftHBaseException getEx() {
+    return ex;
+  }
+
+  @ThriftField(3)
+  public List<Result> getResults() {
+    return results;
+  }
+
+  @ThriftField(4)
+  public Type getType() {
+    return type;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(integer, results, ex, type);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    IntegerOrResultOrException other = (IntegerOrResultOrException) obj;
+    if (ex == null) {
+      if (other.ex != null)
+        return false;
+    } else if (!ex.equals(other.ex))
+      return false;
+    if (integer == null) {
+      if (other.integer != null)
+        return false;
+    } else if (!integer.equals(other.integer))
+      return false;
+    if (results == null) {
+      if (other.results != null)
+        return false;
+    } else if (!results.equals(other.results))
+      return false;
+    if (type != other.type)
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "IntegerOrResultOrException [integer=" + integer + ", ex=" + ex
+        + ", results=" + results + ", type=" + type + "]";
+  }
+
+  /**
+   * Create IntegerOrResultOrException from Object
+   * @param obj
+   * @return
+   */
+  public static IntegerOrResultOrException createFromObject(Object obj) {
+    return new IntegerOrResultOrException(obj);
+  }
+
+  /**
+   * This is used when transforming TMultiResponse to MultiResponse
+   *
+   * @param ioroe
+   */
+  public static Object createObjectFromIntegerOrResultOrException(
+      IntegerOrResultOrException ioroe) {
+    if (ioroe.getType().equals(Type.INTEGER)) {
+       return ioroe.getInteger();
+    } else if (ioroe.getType().equals(Type.LIST_OF_RESULTS)) {
+      List<Result> list = ioroe.getResults();
+      return list.toArray(new Result[list.size()]);
+    } else if (ioroe.getType().equals(Type.EXCEPTION)) {
+      return ioroe.getEx().getServerJavaException();
+    }
+    return null;
+  }
+}

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MetaScanner.java Wed Mar 12 21:17:13 2014
@@ -34,7 +34,7 @@ import java.io.IOException;
  * Scanner class that contains the <code>.META.</code> table scanning logic
  * and uses a Retryable scanner. Provided visitors will be called
  * for each row.
- *
+ * 
  * Although public visibility, this is not a public-facing API and may evolve in
  * minor releases.
  */
@@ -121,8 +121,7 @@ public class MetaScanner {
       byte[] searchRow =
         HRegionInfo.createRegionName(tableName, row, HConstants.NINES,
           false);
-
-      HTable metaTable = new HTable(configuration, metaTableName);
+      HTable metaTable = new HTable(configuration , metaTableName);
       Result startRowResult = metaTable.getRowOrBefore(searchRow,
           HConstants.CATALOG_FAMILY);
       if (startRowResult == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiAction.java Wed Mar 12 21:17:13 2014
@@ -19,36 +19,85 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.DataInput;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
+import com.google.common.base.Objects;
+
 /**
  * Container for Actions (i.e. Get, Delete, or Put), which are grouped by
  * regionName. Intended to be used with HConnectionManager.processBatch()
  */
+@ThriftStruct
 public final class MultiAction implements Writable {
 
   private static final int VERSION_0 = 0;
   // map of regions to lists of puts/gets/deletes for that region.
-  public Map<byte[], List<Get>> gets = null;
-  public Map<byte[], List<Put>> puts = null;
-  public Map<byte[], List<Delete>> deletes = null;
-  public Map<byte[], List<Integer>> originalIndex = null;
+  private Map<byte[], List<Get>> gets = null;
+  private Map<byte[], List<Put>> puts = null;
+  private Map<byte[], List<Delete>> deletes = null;
+  private Map<byte[], List<Integer>> originalIndex = null;
 
   public MultiAction() {
   }
 
   /**
+   * Thrift constructor for MultiAction serialization
+   * @param gets
+   * @param puts
+   * @param deletes
+   * @param originalIndex
+   */
+  @ThriftConstructor
+  public MultiAction(@ThriftField(1) Map<byte[], List<Get>> gets,
+      @ThriftField(2) Map<byte[], List<Put>> puts,
+      @ThriftField(3) Map<byte[], List<Delete>> deletes) {
+    if (gets != null) {
+      this.gets = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.gets.putAll(gets);
+    }
+    if (puts != null) {
+      this.puts = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.puts.putAll(puts);
+    }
+    if (deletes != null) {
+      this.deletes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.deletes.putAll(deletes);
+    }
+  }
+
+  @ThriftField(1)
+  public Map<byte[], List<Get>> getGets() {
+    return gets;
+  }
+
+  @ThriftField(2)
+  public Map<byte[], List<Put>> getPuts() {
+    return puts;
+  }
+
+  @ThriftField(3)
+  public Map<byte[], List<Delete>> getDeletes() {
+    return deletes;
+  }
+
+  public Map<byte[], List<Integer>> getOriginalIndex() {
+    return originalIndex;
+  }
+
+  /**
    * Add an Action to this container based on it's regionName. If the regionName
    * is wrong, the initial execution will fail, but will be automatically
    * retried after looking up the correct region.
@@ -122,7 +171,6 @@ public final class MultiAction implement
     deletes = readMap(in);
   }
 
-  @SuppressWarnings("unchecked")
   private <R extends Row> Map<byte[], List<R>> readMap(DataInput in) throws IOException {
     int mapSize = in.readInt();
 
@@ -134,11 +182,51 @@ public final class MultiAction implement
       int listSize = in.readInt();
       List<R> lst = new ArrayList<R>(listSize);
       for (int j = 0; j < listSize; j++) {
-        lst.add((R) HbaseObjectWritable.readObject(in, null));
+        @SuppressWarnings("unchecked")
+        R elem = (R) HbaseObjectWritable.readObject(in, null);
+        lst.add(elem);
       }
       map.put(key, lst);
     }
     return map;
   }
 
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(deletes, puts, gets);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    MultiAction other = (MultiAction) obj;
+    if (deletes == null) {
+      if (other.deletes != null)
+        return false;
+    } else if (!deletes.equals(other.deletes))
+      return false;
+    if (gets == null) {
+      if (other.gets != null)
+        return false;
+    } else if (!gets.equals(other.gets))
+      return false;
+    if (puts == null) {
+      if (other.puts != null)
+        return false;
+    } else if (!puts.equals(other.puts))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "MultiAction [gets=" + gets + ", puts=" + puts + ", deletes="
+        + deletes + "]";
+  }
+
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPut.java Wed Mar 12 21:17:13 2014
@@ -33,14 +33,19 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
 /**
  * Data type class for putting multiple regions worth of puts in one RPC.
  */
+@ThriftStruct
 public class MultiPut extends Operation implements Writable {
   public HServerAddress address; // client code ONLY
 
@@ -64,6 +69,16 @@ public class MultiPut extends Operation 
     address = a;
   }
 
+  @ThriftConstructor
+  public MultiPut(@ThriftField(1) final Map<byte[], List<Put>> putsSerial) {
+    this.puts.putAll(putsSerial);
+  }
+
+  @ThriftField(1)
+  public Map<byte[], List<Put>> getPuts() {
+    return puts;
+  }
+
   public int size() {
     int size = 0;
     for( List<Put> l : puts.values()) {
@@ -103,17 +118,17 @@ public class MultiPut extends Operation 
   }
 
   /**
-   * Compile the table and column family (i.e. schema) information
-   * into a String. Useful for parsing and aggregation by debugging,
+   * Compile the table and column family (i.e. schema) information 
+   * into a String. Useful for parsing and aggregation by debugging, 
    * logging, and administration tools.
    * @return Map
    */
   @Override
   public Map<String, Object> getFingerprint() {
     Map<String, Object> map = new HashMap<String, Object>();
-    // for extensibility, we have a map of table information that we will
+    // for extensibility, we have a map of table information that we will 
     // populate with only family information for each table
-    Map<String, Map> tableInfo =
+    Map<String, Map> tableInfo = 
       new HashMap<String, Map>();
     map.put("tables", tableInfo);
     for (Map.Entry<byte[], List<Put>> entry : puts.entrySet()) {
@@ -121,8 +136,8 @@ public class MultiPut extends Operation 
       // not how many Puts touch them, so we use this Set to do just that.
       Set<String> familySet;
       try {
-        // since the puts are stored by region, we may have already
-        // recorded families for this region. if that is the case,
+        // since the puts are stored by region, we may have already 
+        // recorded families for this region. if that is the case, 
         // we want to add to the existing Set. if not, we make a new Set.
         String tableName = Bytes.toStringBinary(
             HRegionInfo.parseRegionName(entry.getKey())[0]);
@@ -141,7 +156,7 @@ public class MultiPut extends Operation 
         table.put("families", familySet);
         tableInfo.put(Bytes.toStringBinary(entry.getKey()), table);
       }
-      // we now iterate through each Put and keep track of which families
+      // we now iterate through each Put and keep track of which families 
       // are affected in this table.
       for (Put p : entry.getValue()) {
         for (byte[] fam : p.getFamilyMap().keySet()) {
@@ -153,8 +168,8 @@ public class MultiPut extends Operation 
   }
 
   /**
-   * Compile the details beyond the scope of getFingerprint (mostly
-   * toMap from the Puts) into a Map along with the fingerprinted
+   * Compile the details beyond the scope of getFingerprint (mostly 
+   * toMap from the Puts) into a Map along with the fingerprinted 
    * information. Useful for debugging, logging, and administration tools.
    * @param maxCols a limit on the number of columns output prior to truncation
    * @return Map
@@ -171,9 +186,9 @@ public class MultiPut extends Operation 
         continue;
       }
       List<Put> regionPuts = entry.getValue();
-      List<Map<String, Object>> putSummaries =
+      List<Map<String, Object>> putSummaries = 
         new ArrayList<Map<String, Object>>();
-      // find out how many of this region's puts we can add without busting
+      // find out how many of this region's puts we can add without busting 
       // the maximum
       int regionPutsToAdd = regionPuts.size();
       putCount += regionPutsToAdd;
@@ -192,11 +207,11 @@ public class MultiPut extends Operation 
         // in the case of parse error, default to labeling by region
         tableName = Bytes.toStringBinary(entry.getKey());
       }
-      // since the puts are stored by region, we may have already
-      // recorded puts for this table. if that is the case,
-      // we want to add to the existing List. if not, we place a new list
+      // since the puts are stored by region, we may have already 
+      // recorded puts for this table. if that is the case, 
+      // we want to add to the existing List. if not, we place a new list 
       // in the map
-      Map<String, Object> table =
+      Map<String, Object> table = 
         (Map<String, Object>) tableInfo.get(tableName);
       if (table == null) {
         // in case the Put has changed since getFingerprint's map was built
@@ -246,4 +261,22 @@ public class MultiPut extends Operation 
       puts.put(key, ps);
     }
   }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    MultiPut other = (MultiPut) obj;
+    return (this.getPuts().size() == other.getPuts().size()) &&
+      (this.getPuts().entrySet().containsAll(other.getPuts().entrySet()));
+  }
+
+  @Override
+  public int hashCode() {
+    return puts.hashCode();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiPutResponse.java Wed Mar 12 21:17:13 2014
@@ -20,6 +20,9 @@
 
 package org.apache.hadoop.hbase.client;
 
+import com.facebook.swift.codec.ThriftConstructor;
+import com.facebook.swift.codec.ThriftField;
+import com.facebook.swift.codec.ThriftStruct;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 
@@ -32,6 +35,7 @@ import java.util.TreeMap;
 /**
  * Response class for MultiPut.
  */
+@ThriftStruct
 public class MultiPutResponse implements Writable {
 
   protected MultiPut request; // used in client code ONLY
@@ -40,6 +44,20 @@ public class MultiPutResponse implements
 
   public MultiPutResponse() {}
 
+  @ThriftConstructor
+  public MultiPutResponse(@ThriftField(1) final Map<byte[], Integer> answers) {
+    // Adding it to the existing TreeMap, because we want to use the
+    // BYTES_COMPARATOR.
+    for (Map.Entry<byte[], Integer> e : answers.entrySet()) {
+      this.answers.put(e.getKey(), e.getValue());
+    }
+  }
+
+  @ThriftField(1)
+  public Map<byte[], Integer> getAnswers() {
+    return answers;
+  }
+
   public void addResult(byte[] regionName, int result) {
     answers.put(regionName, result);
   }
@@ -69,4 +87,30 @@ public class MultiPutResponse implements
       answers.put(key, value);
     }
   }
+
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    MultiPutResponse other = (MultiPutResponse)obj;
+    if ((other.answers == null) != (this.answers == null)) {
+      return false;
+    }
+    if (this.answers != null) {
+      // If the answers map is not null, they should be of the same size, and
+      // have the same entries.
+      if (!((this.answers.size() == other.answers.size()) &&
+             this.answers.entrySet().containsAll(other.answers.entrySet()))) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  public int hashCode() {
+    return answers.hashCode();
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/MultiResponse.java Wed Mar 12 21:17:13 2014
@@ -20,25 +20,17 @@
 
 package org.apache.hadoop.hbase.client;
 
-import org.apache.hadoop.io.Writable;
-import org.apache.hadoop.hbase.io.HbaseObjectWritable;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.HServerAddress;
-import org.apache.hadoop.io.WritableUtils;
-import org.apache.hadoop.util.StringUtils;
-
+import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.DataInput;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-import java.util.Collection;
-import java.util.List;
 import java.util.Map;
-import java.util.ArrayList;
+import java.util.Map.Entry;
 import java.util.TreeMap;
 
+import org.apache.hadoop.hbase.io.HbaseObjectWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
 /**
  * A container for Result objects, grouped by regionName.
  */
@@ -54,15 +46,38 @@ public class MultiResponse implements Wr
   public MultiResponse() {
   }
 
+  public MultiResponse(Map<byte[], Object> resultsForGet,
+      Map<byte[], Object> resultsForPut,
+      Map<byte[], Object> resultsForDelete) {
+    // TODO @gauravm: Change from copy to direct assignment.
+    if (resultsForGet != null) {
+      this.resultsForGet = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.resultsForGet.putAll(resultsForGet);
+    } else {
+      this.resultsForGet = null;
+    }
+
+    if (resultsForPut != null) {
+      this.resultsForPut = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.resultsForPut.putAll(resultsForPut);
+    } else {
+      this.resultsForPut = null;
+    }
+
+    if (resultsForDelete != null) {
+      this.resultsForDelete = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+      this.resultsForDelete.putAll(resultsForDelete);
+    } else {
+      this.resultsForDelete = null;
+    }
+  }
 
   /**
    * Add the pair to the container, grouped by the regionName
    *
-   * @param regionName
-   * @param r
-   *          First item in the pair is the original index of the Action
-   *          (request). Second item is the Result. Result will be empty for
-   *          successful Put and Delete actions.
+   * @param regionName - name of the region
+   * @param rs - can be Integer, Result[] or Exception - if the
+   * call was not successful.
    */
   public void addGetResponse(byte[] regionName, Object rs) {
     if (resultsForGet == null)
@@ -98,12 +113,12 @@ public class MultiResponse implements Wr
     return ((Integer)result).intValue();
   }
 
-  public Result[] getGetResult(byte []regionName) throws Exception {
+  public Result[] getGetResult(byte[] regionName) throws Exception {
     Object result = resultsForGet.get(regionName);
     if (result instanceof Exception) // false if result == null
-      throw (Exception)result;
+      throw (Exception) result;
 
-    return ((Result[])result);
+    return ((Result[]) result);
   }
 
   @Override
@@ -153,4 +168,91 @@ public class MultiResponse implements Wr
     return map;
   }
 
+  @Override
+  public int hashCode() {
+    final int prime = 31;
+    int result = 1;
+    result = prime * result
+        + ((resultsForDelete == null) ? 0 : resultsForDelete.hashCode());
+    result = prime * result
+        + ((resultsForGet == null) ? 0 : resultsForGet.hashCode());
+    result = prime * result
+        + ((resultsForPut == null) ? 0 : resultsForPut.hashCode());
+    return result;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (this == obj)
+      return true;
+    if (obj == null)
+      return false;
+    if (getClass() != obj.getClass())
+      return false;
+    MultiResponse other = (MultiResponse) obj;
+    if (resultsForDelete == null) {
+      if (other.resultsForDelete != null)
+        return false;
+    } else if (!resultsForDelete.equals(other.resultsForDelete))
+      return false;
+    if (resultsForGet == null) {
+      if (other.resultsForGet != null)
+        return false;
+    } else if (!resultsForGet.equals(other.resultsForGet))
+      return false;
+    if (resultsForPut == null) {
+      if (other.resultsForPut != null)
+        return false;
+    } else if (!resultsForPut.equals(other.resultsForPut))
+      return false;
+    return true;
+  }
+
+  @Override
+  public String toString() {
+    return "MultiResponse [resultsForGet=" + resultsForGet + ", resultsForPut="
+        + resultsForPut + ", resultsForDelete=" + resultsForDelete + "]";
+  }
+
+  public Map<byte[], Object> getResultsForGet() {
+    return resultsForGet;
+  }
+
+  public Map<byte[], Object> getResultsForPut() {
+    return resultsForPut;
+  }
+
+  public Map<byte[], Object> getResultsForDelete() {
+    return resultsForDelete;
+  }
+
+  public static class Builder {
+
+    public static MultiResponse createFromTMultiResponse(
+        TMultiResponse tMultiResponse) {
+      Map<byte[], IntegerOrResultOrException> resultsForGet = tMultiResponse
+          .getResultsForGet();
+      Map<byte[], IntegerOrResultOrException> resutsForPut = tMultiResponse
+          .getResultsForPut();
+      Map<byte[], IntegerOrResultOrException> resultsForDelete = tMultiResponse
+          .getResultsForDelete();
+      return new MultiResponse(transformTmultiResponseMap(resultsForGet),
+          transformTmultiResponseMap(resutsForPut),
+          transformTmultiResponseMap(resultsForDelete));
+    }
+
+    public static Map<byte[], Object> transformTmultiResponseMap(
+        Map<byte[], IntegerOrResultOrException> map) {
+      if (map == null) {
+        return null;
+      }
+      Map<byte[], Object> resultMap = new TreeMap<byte[], Object>(
+          Bytes.BYTES_COMPARATOR);
+      for (Entry<byte[], IntegerOrResultOrException> entry : map.entrySet()) {
+        resultMap.put(entry.getKey(), IntegerOrResultOrException
+            .createObjectFromIntegerOrResultOrException(entry.getValue()));
+      }
+      return resultMap;
+    }
+  }
 }

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Mutation.java Wed Mar 12 21:17:13 2014
@@ -25,16 +25,13 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import java.util.UUID;
 
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.util.Bytes;
 
 public abstract class Mutation extends OperationWithAttributes implements Row {
-  // Attribute used in Mutations to indicate the originating cluster.
   private static final String CLUSTER_ID_ATTR = "_c.id_";
-
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
   protected long lockId = -1L;
@@ -170,7 +167,7 @@ public abstract class Mutation extends O
    * @return The lock ID.
    */
   public long getLockId() {
-  return this.lockId;
+    return this.lockId;
   }
 
   /**

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java?rev=1576909&r1=1576908&r2=1576909&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Operation.java Wed Mar 12 21:17:13 2014
@@ -34,14 +34,14 @@ public abstract class Operation {
   private static final int DEFAULT_MAX_COLS = 5;
 
   /**
-   * Produces a Map containing a fingerprint which identifies the type and
+   * Produces a Map containing a fingerprint which identifies the type and 
    * the static schema components of a query (i.e. column families)
    * @return a map containing fingerprint information (i.e. column families)
    */
   public abstract Map<String, Object> getFingerprint();
 
   /**
-   * Produces a Map containing a summary of the details of a query
+   * Produces a Map containing a summary of the details of a query 
    * beyond the scope of the fingerprint (i.e. columns, rows...)
    * @param maxCols a limit on the number of columns output prior to truncation
    * @return a map containing parameters of a query (i.e. rows, columns...)
@@ -57,7 +57,7 @@ public abstract class Operation {
   }
 
   /**
-   * Produces a JSON object for fingerprint and details exposure in a
+   * Produces a JSON object for fingerprint and details exposure in a 
    * parseable format.
    * @param maxCols a limit on the number of columns to include in the JSON
    * @return a JSONObject containing this Operation's information, as a string
@@ -68,7 +68,7 @@ public abstract class Operation {
   }
 
   /**
-   * Produces a JSON object sufficient for description of a query
+   * Produces a JSON object sufficient for description of a query 
    * in a debugging or logging context.
    * @return the produced JSON object, as a string
    */
@@ -77,16 +77,16 @@ public abstract class Operation {
   }
 
   /**
-   * Produces a string representation of this Operation. It defaults to a JSON
-   * representation, but falls back to a string representation of the
+   * Produces a string representation of this Operation. It defaults to a JSON 
+   * representation, but falls back to a string representation of the 
    * fingerprint and details in the case of a JSON encoding failure.
-   * @param maxCols a limit on the number of columns output in the summary
+   * @param maxCols a limit on the number of columns output in the summary 
    * prior to truncation
    * @return a JSON-parseable String
    */
   public String toString(int maxCols) {
-    /* for now this is merely a wrapper from producing a JSON string, but
-     * toJSON is kept separate in case this is changed to be a less parsable
+    /* for now this is merely a wrapper from producing a JSON string, but 
+     * toJSON is kept separate in case this is changed to be a less parsable 
      * pretty printed representation.
      */
     try {
@@ -97,8 +97,8 @@ public abstract class Operation {
   }
 
   /**
-   * Produces a string representation of this Operation. It defaults to a JSON
-   * representation, but falls back to a string representation of the
+   * Produces a string representation of this Operation. It defaults to a JSON 
+   * representation, but falls back to a string representation of the 
    * fingerprint and details in the case of a JSON encoding failure.
    * @return String
    */
@@ -107,3 +107,4 @@ public abstract class Operation {
     return toString(DEFAULT_MAX_COLS);
   }
 }
+