You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/16 17:12:29 UTC

[01/14] hbase git commit: HBASE-17746 TestSimpleRpcScheduler.testCoDelScheduling is broken

Repository: hbase
Updated Branches:
  refs/heads/hbase-12439 4b541d638 -> e67eb6c42


HBASE-17746 TestSimpleRpcScheduler.testCoDelScheduling is broken

Signed-off-by: Guanghao Zhang <zg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/31bc94ae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/31bc94ae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/31bc94ae

Branch: refs/heads/hbase-12439
Commit: 31bc94ae6094d02a73d347549e23bcbaff97838f
Parents: 4b541d6
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Fri Mar 10 18:42:27 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Mar 13 10:13:42 2017 +0800

----------------------------------------------------------------------
 .../main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +-
 .../java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java  | 3 +--
 2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/31bc94ae/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 616f741..d51d83b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -85,7 +85,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
       callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount),
         maxQueueLength, priority, conf, server);
     } else {
-      if (RpcExecutor.isFifoQueueType(callQueueType)) {
+      if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
         callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,
             maxQueueLength, priority, conf, server);
       } else {

http://git-wip-us.apache.org/repos/asf/hbase/blob/31bc94ae/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 04ac519..5e4520d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -434,8 +434,7 @@ public class TestSimpleRpcScheduler {/*
   @Test
   public void testCoDelScheduling() throws Exception {
     CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
-    envEdge.threadNamePrefixs.add("RpcServer.CodelFPBQ.default.handler");
-    envEdge.threadNamePrefixs.add("RpcServer.CodelRWQ.default.handler");
+    envEdge.threadNamePrefixs.add("RpcServer.deafult.FPBQ.Codel.handler");
     Configuration schedConf = HBaseConfiguration.create();
     schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
     schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,


[02/14] hbase git commit: HBASE-17773 VerifyReplication tool wrongly emits Invalid arguments error

Posted by sy...@apache.org.
HBASE-17773 VerifyReplication tool wrongly emits Invalid arguments error

Signed-off-by: Guanghao Zhang <zg...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fee67bcf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fee67bcf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fee67bcf

Branch: refs/heads/hbase-12439
Commit: fee67bcf1432cd16720fb97a0135bd67b0d2b064
Parents: 31bc94a
Author: Tom Tsuruhara <to...@linecorp.com>
Authored: Sat Mar 11 18:20:06 2017 +0900
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Mar 13 17:46:58 2017 +0800

----------------------------------------------------------------------
 .../hbase/mapreduce/replication/VerifyReplication.java      | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fee67bcf/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 145fede..ba5966b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -463,10 +463,6 @@ public class VerifyReplication extends Configured implements Tool {
           continue;
         }
 
-        if (cmd.startsWith("--")) {
-          printUsage("Invalid argument '" + cmd + "'");
-        }
-
         final String delimiterArgKey = "--delimiter=";
         if (cmd.startsWith(delimiterArgKey)) {
           delimiter = cmd.substring(delimiterArgKey.length());
@@ -483,6 +479,11 @@ public class VerifyReplication extends Configured implements Tool {
           verbose = true;
           continue;
         }        
+
+        if (cmd.startsWith("--")) {
+          printUsage("Invalid argument '" + cmd + "'");
+        }
+
         if (i == args.length-2) {
           peerId = cmd;
         }


[09/14] hbase git commit: HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan

Posted by sy...@apache.org.
HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a49bc58a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a49bc58a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a49bc58a

Branch: refs/heads/hbase-12439
Commit: a49bc58a5456c2974552c7b6ffab9ea39393ca78
Parents: aace02a
Author: zhangduo <zh...@apache.org>
Authored: Fri Feb 24 14:08:10 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 15 17:48:58 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/AbstractClientScanner.java |  8 +++-----
 .../hbase/client/AsyncTableResultScanner.java      |  6 ++++++
 .../apache/hadoop/hbase/client/ClientScanner.java  | 11 ++++++-----
 .../apache/hadoop/hbase/client/ResultScanner.java  |  6 ++++++
 .../java/org/apache/hadoop/hbase/client/Scan.java  |  6 +++++-
 .../client/metrics/ServerSideScanMetrics.java      | 13 +++++++++++--
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 17 ++++++-----------
 .../hadoop/hbase/rest/client/RemoteHTable.java     |  6 ++++++
 .../hbase/client/ClientSideRegionScanner.java      |  1 -
 .../hbase/mapreduce/TableRecordReaderImpl.java     |  4 ++--
 .../TestServerSideScanMetricsFromClientSide.java   | 14 +++++++-------
 .../hadoop/hbase/regionserver/RegionAsTable.java   |  6 ++++++
 12 files changed, 64 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 87304c3..ffb2fa1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -38,13 +38,11 @@ public abstract class AbstractClientScanner implements ResultScanner {
   }
 
   /**
-   * Used internally accumulating metrics on scan. To
-   * enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}.
-   * These metrics are cleared at key transition points. Metrics are accumulated in the
-   * {@link Scan} object itself.
-   * @see Scan#getScanMetrics()
+   * Used internally accumulating metrics on scan. To enable collection of metrics on a Scanner,
+   * call {@link Scan#setScanMetricsEnabled(boolean)}.
    * @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled.
    */
+  @Override
   public ScanMetrics getScanMetrics() {
     return scanMetrics;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 38d4b2c..eef797c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -30,6 +30,7 @@ import java.util.Queue;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
 /**
  * The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@@ -164,4 +165,9 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
   synchronized boolean isSuspended() {
     return resumer != null;
   }
+
+  @Override
+  public ScanMetrics getScanMetrics() {
+    throw new UnsupportedOperationException();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 53e6dd8..bd3d4ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -301,15 +300,17 @@ public abstract class ClientScanner extends AbstractClientScanner {
    * for scan/map reduce scenarios, we will have multiple scans running at the same time. By
    * default, scan metrics are disabled; if the application wants to collect them, this behavior can
    * be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
-   * <p>
-   * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
    */
   protected void writeScanMetrics() {
     if (this.scanMetrics == null || scanMetricsPublished) {
       return;
     }
-    MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
-    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
+    // Publish ScanMetrics to the Scan Object.
+    // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
+    // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
+    // to Scan will be messed up.
+    scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
+      ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
     scanMetricsPublished = true;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
index e9cb476..8951e84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
@@ -27,6 +27,7 @@ import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 
 /**
  * Interface for client-side scanning. Go to {@link Table} to obtain instances.
@@ -116,4 +117,9 @@ public interface ResultScanner extends Closeable, Iterable<Result> {
    * @return true if the lease was successfully renewed, false otherwise.
    */
   boolean renewLease();
+
+  /**
+   * @return the scan metrics, or {@code null} if we do not enable metrics.
+   */
+  ScanMetrics getScanMetrics();
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index a7d81af..03c692c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -1081,9 +1081,13 @@ public class Scan extends Query {
   /**
    * @return Metrics on this Scan, if metrics were enabled.
    * @see #setScanMetricsEnabled(boolean)
+   * @deprecated Use {@link ResultScanner#getScanMetrics()} instead. And notice that, please do not
+   *             use this method and {@link ResultScanner#getScanMetrics()} together, the metrics
+   *             will be messed up.
    */
+  @Deprecated
   public ScanMetrics getScanMetrics() {
-    byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
+    byte[] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
     if (bytes == null) return null;
     return ProtobufUtil.toScanMetrics(bytes);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 7171a94..b14938b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -106,11 +106,20 @@ public class ServerSideScanMetrics {
    * @return A Map of String -&gt; Long for metrics
    */
   public Map<String, Long> getMetricsMap() {
+    return getMetricsMap(true);
+  }
+
+  /**
+   * Get all of the values. If reset is true, we will reset the all AtomicLongs back to 0.
+   * @param reset whether to reset the AtomicLongs to 0.
+   * @return A Map of String -&gt; Long for metrics
+   */
+  public Map<String, Long> getMetricsMap(boolean reset) {
     // Create a builder
     ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
-    // For every entry add the value and reset the AtomicLong back to zero
     for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
-      builder.put(e.getKey(), e.getValue().getAndSet(0));
+      long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+      builder.put(e.getKey(), value);
     }
     // Build the immutable map so that people can't mess around with it.
     return builder.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 10827c3..f44979c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.io.LimitInputStream;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.master.RegionState;
 import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.quotas.QuotaScope;
 import org.apache.hadoop.hbase.quotas.QuotaType;
 import org.apache.hadoop.hbase.quotas.ThrottleType;
@@ -95,7 +94,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Parser;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcChannel;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
 import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
@@ -164,6 +162,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescript
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
 import org.apache.hadoop.hbase.util.Addressing;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.DynamicClassLoader;
@@ -2043,12 +2042,11 @@ public final class ProtobufUtil {
   }
 
   public static ScanMetrics toScanMetrics(final byte[] bytes) {
-    Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
     MapReduceProtos.ScanMetrics pScanMetrics = null;
     try {
-      pScanMetrics = parser.parseFrom(bytes);
+      pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes);
     } catch (InvalidProtocolBufferException e) {
-      //Ignored there are just no key values to add.
+      // Ignored there are just no key values to add.
     }
     ScanMetrics scanMetrics = new ScanMetrics();
     if (pScanMetrics != null) {
@@ -2061,15 +2059,12 @@ public final class ProtobufUtil {
     return scanMetrics;
   }
 
-  public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) {
+  public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) {
     MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
-    Map<String, Long> metrics = scanMetrics.getMetricsMap();
+    Map<String, Long> metrics = scanMetrics.getMetricsMap(reset);
     for (Entry<String, Long> e : metrics.entrySet()) {
       HBaseProtos.NameInt64Pair nameInt64Pair =
-          HBaseProtos.NameInt64Pair.newBuilder()
-              .setName(e.getKey())
-              .setValue(e.getValue())
-              .build();
+          HBaseProtos.NameInt64Pair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build();
       builder.addMetrics(nameInt64Pair);
     }
     return builder.build();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index e762c31..9cc3198 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -641,6 +642,11 @@ public class RemoteHTable implements Table {
     public boolean renewLease() {
       throw new RuntimeException("renewLease() not supported");
     }
+
+    @Override
+    public ScanMetrics getScanMetrics() {
+      throw new RuntimeException("getScanMetrics() not supported");
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 8ff118e..7ae0537 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -52,7 +52,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
   public ClientSideRegionScanner(Configuration conf, FileSystem fs,
       Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
           throws IOException {
-
     // region is immutable, set isolation level
     scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
index 6f1d140..a8ed5f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
@@ -81,7 +81,7 @@ public class TableRecordReaderImpl {
    */
   public void restart(byte[] firstRow) throws IOException {
     currentScan = new Scan(scan);
-    currentScan.setStartRow(firstRow);
+    currentScan.withStartRow(firstRow);
     currentScan.setScanMetricsEnabled(true);
     if (this.scanner != null) {
       if (logScannerActivity) {
@@ -273,7 +273,7 @@ public class TableRecordReaderImpl {
    * @throws IOException
    */
   private void updateCounters() throws IOException {
-    ScanMetrics scanMetrics = currentScan.getScanMetrics();
+    ScanMetrics scanMetrics = scanner.getScanMetrics();
     if (scanMetrics == null) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
index ad63cc8..370d3d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
@@ -192,15 +192,15 @@ public class TestServerSideScanMetricsFromClientSide {
 
     for (int i = 0; i < ROWS.length - 1; i++) {
       scan = new Scan(baseScan);
-      scan.setStartRow(ROWS[0]);
-      scan.setStopRow(ROWS[i + 1]);
+      scan.withStartRow(ROWS[0]);
+      scan.withStopRow(ROWS[i + 1]);
       testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1);
     }
 
     for (int i = ROWS.length - 1; i > 0; i--) {
       scan = new Scan(baseScan);
-      scan.setStartRow(ROWS[i - 1]);
-      scan.setStopRow(ROWS[ROWS.length - 1]);
+      scan.withStartRow(ROWS[i - 1]);
+      scan.withStopRow(ROWS[ROWS.length - 1]);
       testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i);
     }
 
@@ -318,12 +318,12 @@ public class TestServerSideScanMetricsFromClientSide {
   public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
     assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
     ResultScanner scanner = TABLE.getScanner(scan);
-
     // Iterate through all the results
-    for (Result r : scanner) {
+    while (scanner.next() != null) {
+
     }
     scanner.close();
-    ScanMetrics metrics = scan.getScanMetrics();
+    ScanMetrics metrics = scanner.getScanMetrics();
     assertTrue("Metrics are null", metrics != null);
     assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
     final long actualMetricValue = metrics.getCounter(metricKey).get();

http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index cfae7cb..9b96ff2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
 import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 
@@ -172,6 +173,11 @@ public class RegionAsTable implements Table {
     public boolean renewLease() {
       throw new UnsupportedOperationException();
     }
+
+    @Override
+    public ScanMetrics getScanMetrics() {
+      throw new UnsupportedOperationException();
+    }
   };
 
   @Override


[14/14] hbase git commit: HBSE-15314 Allow more than one backing file in bucketcache (Chunhui Shen)

Posted by sy...@apache.org.
HBSE-15314 Allow more than one backing file in bucketcache (Chunhui Shen)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e67eb6c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e67eb6c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e67eb6c4

Branch: refs/heads/hbase-12439
Commit: e67eb6c424d76ee259f5076c277454a73e3a2bf4
Parents: 6a6fff1
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Mar 16 16:11:35 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Mar 16 16:28:58 2017 +0530

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      |   9 +-
 .../hbase/io/hfile/bucket/FileIOEngine.java     | 184 +++++++++++++++----
 .../hbase/io/hfile/bucket/TestFileIOEngine.java |  47 ++++-
 3 files changed, 190 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 3e9c376..3c27f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -314,8 +314,13 @@ public class BucketCache implements BlockCache, HeapSize {
    */
   private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
       throws IOException {
-    if (ioEngineName.startsWith("file:")) {
-      return new FileIOEngine(ioEngineName.substring(5), capacity);
+    if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
+      // In order to make the usage simple, we only need the prefix 'files:' in
+      // document whether one or multiple file(s), but also support 'file:' for
+      // the compatibility
+      String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
+          .split(FileIOEngine.FILE_DELIMITER);
+      return new FileIOEngine(capacity, filePaths);
     } else if (ioEngineName.startsWith("offheap")) {
       return new ByteBufferIOEngine(capacity, true);
     } else if (ioEngineName.startsWith("heap")) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index aaf5cf9..7586d57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -18,10 +18,12 @@
  */
 package org.apache.hadoop.hbase.io.hfile.bucket;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.Arrays;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -39,38 +41,52 @@ import org.apache.hadoop.util.StringUtils;
 @InterfaceAudience.Private
 public class FileIOEngine implements IOEngine {
   private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
-  private final RandomAccessFile raf;
-  private final FileChannel fileChannel;
-  private final String path;
-  private long size;
-
-  public FileIOEngine(String filePath, long fileSize) throws IOException {
-    this.path = filePath;
-    this.size = fileSize;
-    try {
-      raf = new RandomAccessFile(filePath, "rw");
-    } catch (java.io.FileNotFoundException fex) {
-      LOG.error("Can't create bucket cache file " + filePath, fex);
-      throw fex;
-    }
+  public static final String FILE_DELIMITER = ",";
+  private final String[] filePaths;
+  private final FileChannel[] fileChannels;
+  private final RandomAccessFile[] rafs;
 
-    try {
-      raf.setLength(fileSize);
-    } catch (IOException ioex) {
-      LOG.error("Can't extend bucket cache file; insufficient space for "
-          + StringUtils.byteDesc(fileSize), ioex);
-      raf.close();
-      throw ioex;
-    }
+  private final long sizePerFile;
+  private final long capacity;
 
-    fileChannel = raf.getChannel();
-    LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+  private FileReadAccessor readAccessor = new FileReadAccessor();
+  private FileWriteAccessor writeAccessor = new FileWriteAccessor();
+
+  public FileIOEngine(long capacity, String... filePaths) throws IOException {
+    this.sizePerFile = capacity / filePaths.length;
+    this.capacity = this.sizePerFile * filePaths.length;
+    this.filePaths = filePaths;
+    this.fileChannels = new FileChannel[filePaths.length];
+    this.rafs = new RandomAccessFile[filePaths.length];
+    for (int i = 0; i < filePaths.length; i++) {
+      String filePath = filePaths[i];
+      try {
+        rafs[i] = new RandomAccessFile(filePath, "rw");
+        long totalSpace = new File(filePath).getTotalSpace();
+        if (totalSpace < sizePerFile) {
+          // The next setting length will throw exception,logging this message
+          // is just used for the detail reason of exception\uff0c
+          String msg = "Only " + StringUtils.byteDesc(totalSpace)
+              + " total space under " + filePath + ", not enough for requested "
+              + StringUtils.byteDesc(sizePerFile);
+          LOG.warn(msg);
+        }
+        rafs[i].setLength(sizePerFile);
+        fileChannels[i] = rafs[i].getChannel();
+        LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+            + ", on the path:" + filePath);
+      } catch (IOException fex) {
+        LOG.error("Failed allocating cache on " + filePath, fex);
+        shutdown();
+        throw fex;
+      }
+    }
   }
 
   @Override
   public String toString() {
-    return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
-      ", size=" + String.format("%,d", this.size);
+    return "ioengine=" + this.getClass().getSimpleName() + ", paths="
+        + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
   }
 
   /**
@@ -94,7 +110,7 @@ public class FileIOEngine implements IOEngine {
   public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
       throws IOException {
     ByteBuffer dstBuffer = ByteBuffer.allocate(length);
-    fileChannel.read(dstBuffer, offset);
+    accessFile(readAccessor, dstBuffer, offset);
     // The buffer created out of the fileChannel is formed by copying the data from the file
     // Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
     // this buffer from the file the data is already copied and there is no need to ensure that
@@ -114,7 +130,7 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void write(ByteBuffer srcBuffer, long offset) throws IOException {
-    fileChannel.write(srcBuffer, offset);
+    accessFile(writeAccessor, srcBuffer, offset);
   }
 
   /**
@@ -123,7 +139,16 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void sync() throws IOException {
-    fileChannel.force(true);
+    for (int i = 0; i < fileChannels.length; i++) {
+      try {
+        if (fileChannels[i] != null) {
+          fileChannels[i].force(true);
+        }
+      } catch (IOException ie) {
+        LOG.warn("Failed syncing data to " + this.filePaths[i]);
+        throw ie;
+      }
+    }
   }
 
   /**
@@ -131,15 +156,17 @@ public class FileIOEngine implements IOEngine {
    */
   @Override
   public void shutdown() {
-    try {
-      fileChannel.close();
-    } catch (IOException ex) {
-      LOG.error("Can't shutdown cleanly", ex);
-    }
-    try {
-      raf.close();
-    } catch (IOException ex) {
-      LOG.error("Can't shutdown cleanly", ex);
+    for (int i = 0; i < filePaths.length; i++) {
+      try {
+        if (fileChannels[i] != null) {
+          fileChannels[i].close();
+        }
+        if (rafs[i] != null) {
+          rafs[i].close();
+        }
+      } catch (IOException ex) {
+        LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
+      }
     }
   }
 
@@ -147,7 +174,84 @@ public class FileIOEngine implements IOEngine {
   public void write(ByteBuff srcBuffer, long offset) throws IOException {
     // When caching block into BucketCache there will be single buffer backing for this HFileBlock.
     assert srcBuffer.hasArray();
-    fileChannel.write(
-        ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
+    write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
+            srcBuffer.remaining()), offset);
+  }
+
+  private void accessFile(FileAccessor accessor, ByteBuffer buffer,
+      long globalOffset) throws IOException {
+    int startFileNum = getFileNum(globalOffset);
+    int remainingAccessDataLen = buffer.remaining();
+    int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
+    int accessFileNum = startFileNum;
+    long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
+    int bufLimit = buffer.limit();
+    while (true) {
+      FileChannel fileChannel = fileChannels[accessFileNum];
+      if (endFileNum > accessFileNum) {
+        // short the limit;
+        buffer.limit((int) (buffer.limit() - remainingAccessDataLen
+            + sizePerFile - accessOffset));
+      }
+      int accessLen = accessor.access(fileChannel, buffer, accessOffset);
+      // recover the limit
+      buffer.limit(bufLimit);
+      if (accessLen < remainingAccessDataLen) {
+        remainingAccessDataLen -= accessLen;
+        accessFileNum++;
+        accessOffset = 0;
+      } else {
+        break;
+      }
+      if (accessFileNum >= fileChannels.length) {
+        throw new IOException("Required data len "
+            + StringUtils.byteDesc(buffer.remaining())
+            + " exceed the engine's capacity " + StringUtils.byteDesc(capacity)
+            + " where offset=" + globalOffset);
+      }
+    }
+  }
+
+  /**
+   * Get the absolute offset in given file with the relative global offset.
+   * @param fileNum
+   * @param globalOffset
+   * @return the absolute offset
+   */
+  private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
+    return globalOffset - fileNum * sizePerFile;
+  }
+
+  private int getFileNum(long offset) {
+    if (offset < 0) {
+      throw new IllegalArgumentException("Unexpected offset " + offset);
+    }
+    int fileNum = (int) (offset / sizePerFile);
+    if (fileNum >= fileChannels.length) {
+      throw new RuntimeException("Not expected offset " + offset
+          + " where capacity=" + capacity);
+    }
+    return fileNum;
+  }
+
+  private static interface FileAccessor {
+    int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+        throws IOException;
+  }
+
+  private static class FileReadAccessor implements FileAccessor {
+    @Override
+    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+        long accessOffset) throws IOException {
+      return fileChannel.read(byteBuffer, accessOffset);
+    }
+  }
+
+  private static class FileWriteAccessor implements FileAccessor {
+    @Override
+    public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+        long accessOffset) throws IOException {
+      return fileChannel.write(byteBuffer, accessOffset);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 93f4cf7..d1f3dfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
 import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -38,13 +40,39 @@ import org.junit.experimental.categories.Category;
 public class TestFileIOEngine {
   @Test
   public void testFileIOEngine() throws IOException {
-    int size = 2 * 1024 * 1024; // 2 MB
-    String filePath = "testFileIOEngine";
+    long totalCapacity = 6 * 1024 * 1024; // 6 MB
+    String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2",
+        "testFileIOEngine3" };
+    long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File
+    List<Long> boundaryStartPositions = new ArrayList<Long>();
+    boundaryStartPositions.add(0L);
+    for (int i = 1; i < filePaths.length; i++) {
+      boundaryStartPositions.add(sizePerFile * i - 1);
+      boundaryStartPositions.add(sizePerFile * i);
+      boundaryStartPositions.add(sizePerFile * i + 1);
+    }
+    List<Long> boundaryStopPositions = new ArrayList<Long>();
+    for (int i = 1; i < filePaths.length; i++) {
+      boundaryStopPositions.add(sizePerFile * i - 1);
+      boundaryStopPositions.add(sizePerFile * i);
+      boundaryStopPositions.add(sizePerFile * i + 1);
+    }
+    boundaryStopPositions.add(sizePerFile * filePaths.length - 1);
+    FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths);
     try {
-      FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
-      for (int i = 0; i < 50; i++) {
+      for (int i = 0; i < 500; i++) {
         int len = (int) Math.floor(Math.random() * 100);
-        long offset = (long) Math.floor(Math.random() * size % (size - len));
+        long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity - len));
+        if (i < boundaryStartPositions.size()) {
+          // make the boundary start positon
+          offset = boundaryStartPositions.get(i);
+        } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
+          // make the boundary stop positon
+          offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
+        } else if (i % 2 == 0) {
+          // make the cross-files block writing/reading
+          offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2;
+        }
         byte[] data1 = new byte[len];
         for (int j = 0; j < data1.length; ++j) {
           data1[j] = (byte) (Math.random() * 255);
@@ -58,9 +86,12 @@ public class TestFileIOEngine {
         }
       }
     } finally {
-      File file = new File(filePath);
-      if (file.exists()) {
-        file.delete();
+      fileIOEngine.shutdown();
+      for (String filePath : filePaths) {
+        File file = new File(filePath);
+        if (file.exists()) {
+          file.delete();
+        }
       }
     }
 


[03/14] hbase git commit: HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)

Posted by sy...@apache.org.
HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35d7a0cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35d7a0cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35d7a0cd

Branch: refs/heads/hbase-12439
Commit: 35d7a0cd0798cabe7df5766fcc993512eca6c92e
Parents: fee67bc
Author: Jerry He <je...@apache.org>
Authored: Mon Mar 13 12:02:07 2017 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon Mar 13 12:02:07 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/spark/DefaultSource.scala      | 28 ++++-----
 .../hbase/spark/HBaseConnectionCache.scala      |  2 +-
 .../spark/datasources/HBaseSparkConf.scala      | 62 ++++++++++++--------
 .../hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++---
 .../spark/DynamicLogicExpressionSuite.scala     |  2 +-
 .../hadoop/hbase/spark/HBaseTestSource.scala    | 13 ++--
 .../hbase/spark/PartitionFilterSuite.scala      |  6 +-
 7 files changed, 69 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a8b2ab8..b2b646a 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -97,36 +97,36 @@ case class HBaseRelation (
   )(@transient val sqlContext: SQLContext)
   extends BaseRelation with PrunedFilteredScan  with InsertableRelation  with Logging {
   val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
-  val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
-  val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+  val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
+  val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
   val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
-  val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
+  val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
 
   @transient val encoder = JavaBytesEncoder.create(encoderClsName)
 
   val catalog = HBaseTableCatalog(parameters)
   def tableName = catalog.name
-  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
-  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
-  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
-    .map(_.toBoolean).getOrElse(true)
+  val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
+  val useHBaseContext =  parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
+  val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
+    .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
 
   // The user supplied per table parameter will overwrite global ones in SparkConf
-  val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
+  val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
     .getOrElse(
       sqlContext.sparkContext.getConf.getBoolean(
-        HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
-  val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
+        HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
+  val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
     .getOrElse(
       sqlContext.sparkContext.getConf.getInt(
-      HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
-  val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
+      HBaseSparkConf.QUERY_CACHEDROWS, -1))
+  val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
     .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.BATCH_NUM,  HBaseSparkConf.defaultBatchNum))
+    HBaseSparkConf.QUERY_BATCHSIZE,  -1))
 
   val bulkGetSize =  parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
     .getOrElse(sqlContext.sparkContext.getConf.getInt(
-    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.defaultBulkGetSize))
+    HBaseSparkConf.BULKGET_SIZE,  HBaseSparkConf.DEFAULT_BULKGET_SIZE))
 
   //create or get latest HBaseContext
   val hbaseContext:HBaseContext = if (useHBaseContext) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index fb5833e..2858da8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging {
   val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
 
   // in milliseconds
-  private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
+  private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
   private var timeout = DEFAULT_TIME_OUT
   private var closed: Boolean = false
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 0f20d1d..8c1cb35 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 
+/**
+ * This is the hbase configuration. User can either set them in SparkConf, which
+ * will take effect globally, or configure it per table, which will overwrite the value
+ * set in SparkConf. If not set, the default value will take effect.
+ */
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 object HBaseSparkConf{
-  // This is the hbase configuration. User can either set them in SparkConf, which
-  // will take effect globally, or configure it per table, which will overwrite the value
-  // set in SparkConf. If not setted, the default value will take effect.
-  val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
-  // default block cache is set to true by default following hbase convention, but note that
-  // this potentially may slow down the system
-  val defaultBlockCacheEnable = true
-  val CACHE_SIZE = "spark.hbase.cacheSize"
-  val defaultCachingSize = 1000
-  val BATCH_NUM = "spark.hbase.batchNum"
-  val defaultBatchNum = 1000
-  val BULKGET_SIZE = "spark.hbase.bulkGetSize"
-  val defaultBulkGetSize = 1000
-
-  val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
-  val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
-  val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
-  val defaultPushDownColumnFilter = true
-
+  /** Set to false to disable server-side caching of blocks for this scan,
+   *  false by default, since full table scans generate too much BC churn.
+   */
+  val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
+  val DEFAULT_QUERY_CACHEBLOCKS = false
+  /** The number of rows for caching that will be passed to scan. */
+  val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
+  /** Set the maximum number of values to return for each call to next() in scan. */
+  val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
+  /** The number of BulkGets send to HBase. */
+  val BULKGET_SIZE = "hbase.spark.bulkget.size"
+  val DEFAULT_BULKGET_SIZE = 1000
+  /** Set to specify the location of hbase configuration file. */
+  val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
+  /** Set to specify whether create or use latest cached HBaseContext*/
+  val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
+  val DEFAULT_USE_HBASECONTEXT = true
+  /** Pushdown the filter to data source engine to increase the performance of queries. */
+  val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
+  val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
+  /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
+  val QUERY_ENCODER = "hbase.spark.query.encoder"
+  val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
+  /** The timestamp used to filter columns with a specific timestamp. */
   val TIMESTAMP = "hbase.spark.query.timestamp"
-  val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
-  val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
+  /** The starting timestamp used to filter columns with a specific range of versions. */
+  val TIMERANGE_START = "hbase.spark.query.timerange.start"
+  /** The ending timestamp used to filter columns with a specific range of versions. */
+  val TIMERANGE_END =  "hbase.spark.query.timerange.end"
+  /** The maximum number of version to return. */
   val MAX_VERSIONS = "hbase.spark.query.maxVersions"
-  val ENCODER = "hbase.spark.query.encoder"
-  val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
-
-  // in milliseconds
-  val connectionCloseDelay = 10 * 60 * 1000
+  /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
+  val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 7b8b844..3bce041 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
     TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
     logInfo(" - created table")
     val sparkConf = new SparkConf
-    sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
-    sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
-    sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+    sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
 
     sc  = new SparkContext("local", "test", sparkConf)
 
@@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
           |}""".stripMargin
     df = sqlContext.load("org.apache.hadoop.hbase.spark",
       Map(HBaseTableCatalog.tableCatalog->catalog,
-        HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
+        HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
 
     df.registerTempTable("hbaseNoPushDownTmp")
 
@@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     // Test Getting old stuff -- Full Scan, TimeRange
     val oldRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+        HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
       .format("org.apache.hadoop.hbase.spark")
       .load()
     assert(oldRange.count() == 101)
@@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
     // Test Getting middle stuff -- Full Scan, TimeRange
     val middleRange = sqlContext.read
-      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
-        HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+      .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+        HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
       .format("org.apache.hadoop.hbase.spark")
       .load()
     assert(middleRange.count() == 256)

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
index b9c15ce..bc833e8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
 class DynamicLogicExpressionSuite  extends FunSuite with
 BeforeAndAfterEach with BeforeAndAfterAll with Logging {
 
-  val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
+  val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
 
   test("Basic And Test") {
     val leftLogic = new LessThanLogicExpression("Col1", 0)

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
index 83465d9..ccb4625 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
@@ -49,13 +49,12 @@ case class DummyScan(
   override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
     .map(Row(_))
     .map{ x =>
-      if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
-        HBaseSparkConf.defaultBatchNum) != batchNum ||
-        sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
-          HBaseSparkConf.defaultCachingSize) != cacheSize ||
-        sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
-          HBaseSparkConf.defaultBlockCacheEnable)
-          != blockCachingEnable) {
+      if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE,
+          -1) != batchNum ||
+        sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS,
+          -1) != cacheSize ||
+        sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS,
+          false) != blockCachingEnable) {
         throw new Exception("HBase Spark configuration cannot be set properly")
       }
       x

http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
index d33ced9..f47a319 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
@@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with
 
     TEST_UTIL.startMiniCluster
     val sparkConf = new SparkConf
-    sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
-    sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
-    sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+    sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+    sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
 
     sc = new SparkContext("local", "test", sparkConf)
     new HBaseContext(sc, TEST_UTIL.getConfiguration)


[11/14] hbase git commit: HBASE-17740 Correct the semantic of batch and partial for async client

Posted by sy...@apache.org.
HBASE-17740 Correct the semantic of batch and partial for async client


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1849e8a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1849e8a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1849e8a5

Branch: refs/heads/hbase-12439
Commit: 1849e8a5a77373b5fb8e354c3f20214a80eb8c1a
Parents: 0ecb678
Author: zhangduo <zh...@apache.org>
Authored: Wed Mar 15 18:26:51 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Mar 16 09:44:23 2017 +0800

----------------------------------------------------------------------
 .../client/AllowPartialScanResultCache.java     |  31 ++-
 .../hadoop/hbase/client/AsyncClientScanner.java |   4 +-
 .../hbase/client/BatchScanResultCache.java      | 142 +++++++++++
 .../hadoop/hbase/client/ClientScanner.java      | 253 +------------------
 .../hadoop/hbase/client/ConnectionUtils.java    |  14 +
 .../org/apache/hadoop/hbase/client/Result.java  |  72 +++---
 .../client/TestAllowPartialScanResultCache.java |  33 ++-
 .../hbase/client/TestBatchScanResultCache.java  | 113 +++++++++
 .../TestCompleteResultScanResultCache.java      |   5 +-
 .../client/TestRawAsyncTablePartialScan.java    | 119 +++++++++
 10 files changed, 471 insertions(+), 315 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index caecfd4..82f1ea0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
 import java.io.IOException;
 import java.util.Arrays;
 
@@ -36,10 +38,6 @@ class AllowPartialScanResultCache implements ScanResultCache {
   // beginning of a row when retry.
   private Cell lastCell;
 
-  private Result filterCells(Result result) {
-    return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell);
-  }
-
   private void updateLastCell(Result result) {
     lastCell = result.rawCells()[result.rawCells().length - 1];
   }
@@ -49,22 +47,23 @@ class AllowPartialScanResultCache implements ScanResultCache {
     if (results.length == 0) {
       return EMPTY_RESULT_ARRAY;
     }
-    Result first = filterCells(results[0]);
-    if (results.length == 1) {
-      if (first == null) {
-        // do not update last cell if we filter out all cells
-        return EMPTY_RESULT_ARRAY;
+    int i;
+    for (i = 0; i < results.length; i++) {
+      Result r = filterCells(results[i], lastCell);
+      if (r != null) {
+        results[i] = r;
+        break;
       }
-      updateLastCell(results[0]);
-      results[0] = first;
-      return results;
+    }
+    if (i == results.length) {
+      return EMPTY_RESULT_ARRAY;
     }
     updateLastCell(results[results.length - 1]);
-    if (first == null) {
-      return Arrays.copyOfRange(results, 1, results.length);
+    if (i > 0) {
+      return Arrays.copyOfRange(results, i, results.length);
+    } else {
+      return results;
     }
-    results[0] = first;
-    return results;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 2215d36..fa7aa81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
 import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
 
 import java.io.IOException;
@@ -86,8 +87,7 @@ class AsyncClientScanner {
     this.scanTimeoutNs = scanTimeoutNs;
     this.rpcTimeoutNs = rpcTimeoutNs;
     this.startLogErrorsCnt = startLogErrorsCnt;
-    this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
-        ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
+    this.resultCache = createScanResultCache(scan);
   }
 
   private static final class OpenScannerResponse {

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
new file mode 100644
index 0000000..9ab959b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache for batched scan, i.e,
+ * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
+ * <p>
+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
+ * doesn't mean setAllowPartialResult(true).
+ */
+@InterfaceAudience.Private
+public class BatchScanResultCache implements ScanResultCache {
+
+  private final int batch;
+
+  // used to filter out the cells that already returned to user as we always start from the
+  // beginning of a row when retry.
+  private Cell lastCell;
+
+  private final Deque<Result> partialResults = new ArrayDeque<>();
+
+  private int numCellsOfPartialResults;
+
+  public BatchScanResultCache(int batch) {
+    this.batch = batch;
+  }
+
+  private void updateLastCell(Result result) {
+    lastCell = result.rawCells()[result.rawCells().length - 1];
+  }
+
+  private Result createCompletedResult() throws IOException {
+    Result result = Result.createCompleteResult(partialResults);
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+    return result;
+  }
+
+  // Add new result to the partial list and return a batched Result if caching size exceed batching
+  // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
+  // one Result back at most(or null, which means we do not have enough cells).
+  private Result regroupResults(Result result) {
+    partialResults.addLast(result);
+    numCellsOfPartialResults += result.size();
+    if (numCellsOfPartialResults < batch) {
+      return null;
+    }
+    Cell[] cells = new Cell[batch];
+    int cellCount = 0;
+    boolean stale = false;
+    for (;;) {
+      Result r = partialResults.pollFirst();
+      stale = stale || r.isStale();
+      int newCellCount = cellCount + r.size();
+      if (newCellCount > batch) {
+        // We have more cells than expected, so split the current result
+        int len = batch - cellCount;
+        System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
+        Cell[] remainingCells = new Cell[r.size() - len];
+        System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
+        partialResults.addFirst(
+          Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
+        break;
+      }
+      System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
+      if (newCellCount == batch) {
+        break;
+      }
+      cellCount = newCellCount;
+    }
+    numCellsOfPartialResults -= batch;
+    return Result.create(cells, null, stale,
+      result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
+  }
+
+  @Override
+  public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+    if (results.length == 0) {
+      if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+        return new Result[] { createCompletedResult() };
+      }
+      return EMPTY_RESULT_ARRAY;
+    }
+    List<Result> regroupedResults = new ArrayList<>();
+    for (Result result : results) {
+      result = filterCells(result, lastCell);
+      if (result == null) {
+        continue;
+      }
+      // check if we have a row change
+      if (!partialResults.isEmpty() &&
+          !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+        regroupedResults.add(createCompletedResult());
+      }
+      Result regroupedResult = regroupResults(result);
+      if (regroupedResult != null) {
+        regroupedResults.add(regroupedResult);
+        // only update last cell when we actually return it to user.
+        updateLastCell(regroupedResult);
+      }
+      if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
+        // We are done for this row
+        regroupedResults.add(createCompletedResult());
+      }
+    }
+    return regroupedResults.toArray(new Result[0]);
+  }
+
+  @Override
+  public void clear() {
+    partialResults.clear();
+    numCellsOfPartialResults = 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index bd3d4ef..a8b029f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,16 +18,15 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
 
 import com.google.common.annotations.VisibleForTesting;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.LinkedList;
-import java.util.List;
 import java.util.Queue;
 import java.util.concurrent.ExecutorService;
 
@@ -35,8 +34,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
 import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -69,24 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
   protected HRegionInfo currentRegion = null;
   protected ScannerCallableWithReplicas callable = null;
   protected Queue<Result> cache;
-  /**
-   * A list of partial results that have been returned from the server. This list should only
-   * contain results if this scanner does not have enough partial results to form the complete
-   * result.
-   */
-  protected int partialResultsCellSizes = 0;
-  protected final LinkedList<Result> partialResults = new LinkedList<>();
-
-  /**
-   * The row for which we are accumulating partial Results (i.e. the row of the Results stored
-   * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
-   * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
-   */
-  protected byte[] partialResultsRow = null;
-  /**
-   * The last cell from a not full Row which is added to cache
-   */
-  protected Cell lastCellLoadedToCache = null;
+  private final ScanResultCache scanResultCache;
   protected final int caching;
   protected long lastNext;
   // Keep lastResult returned successfully in case we have to reset scanner.
@@ -159,6 +139,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
     this.rpcControllerFactory = controllerFactory;
 
     this.conf = conf;
+
+    this.scanResultCache = createScanResultCache(scan);
     initCache();
   }
 
@@ -356,14 +338,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
 
   private void closeScannerIfExhausted(boolean exhausted) throws IOException {
     if (exhausted) {
-      if (!partialResults.isEmpty()) {
-        // XXX: continue if there are partial results. But in fact server should not set
-        // hasMoreResults to false if there are partial results.
-        LOG.warn("Server tells us there is no more results for this region but we still have" +
-            " partialResults, this should not happen, retry on the current scanner anyway");
-      } else {
-        closeScanner();
-      }
+      closeScanner();
     }
   }
 
@@ -371,7 +346,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
       MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
     // An exception was thrown which makes any partial results that we were collecting
     // invalid. The scanner will need to be reset to the beginning of a row.
-    clearPartialResults();
+    scanResultCache.clear();
 
     // Unfortunately, DNRIOE is used in two different semantics.
     // (1) The first is to close the client scanner and bubble up the exception all the way
@@ -465,7 +440,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
         if (callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the callable will
           // openScanner with the correct startkey and we must pick up from there
-          clearPartialResults();
+          scanResultCache.clear();
           this.currentRegion = callable.getHRegionInfo();
         }
         retryAfterOutOfOrderException.setValue(true);
@@ -485,29 +460,19 @@ public abstract class ClientScanner extends AbstractClientScanner {
       // Groom the array of Results that we received back from the server before adding that
       // Results to the scanner's cache. If partial results are not allowed to be seen by the
       // caller, all book keeping will be performed within this method.
-      List<Result> resultsToAddToCache =
-          getResultsToAddToCache(values, callable.isHeartbeatMessage());
-      if (!resultsToAddToCache.isEmpty()) {
+      Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+      if (resultsToAddToCache.length > 0) {
         for (Result rs : resultsToAddToCache) {
-          rs = filterLoadedCell(rs);
-          if (rs == null) {
-            continue;
-          }
-
           cache.add(rs);
           long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
           countdown--;
           remainingResultSize -= estimatedHeapSizeOfResult;
           addEstimatedSize(estimatedHeapSizeOfResult);
           this.lastResult = rs;
-          if (this.lastResult.mayHaveMoreCellsInRow()) {
-            updateLastCellLoadedToCache(this.lastResult);
-          } else {
-            this.lastCellLoadedToCache = null;
-          }
         }
-        if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
-          int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+        if (scan.getLimit() > 0) {
+          int newLimit =
+              scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
           assert newLimit >= 0;
           scan.setLimit(newLimit);
         }
@@ -550,13 +515,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
       }
       // we are done with the current region
       if (regionExhausted) {
-        if (!partialResults.isEmpty()) {
-          // XXX: continue if there are partial results. But in fact server should not set
-          // hasMoreResults to false if there are partial results.
-          LOG.warn("Server tells us there is no more results for this region but we still have" +
-              " partialResults, this should not happen, retry on the current scanner anyway");
-          continue;
-        }
         if (!moveToNextRegion()) {
           break;
         }
@@ -573,142 +531,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return cache != null ? cache.size() : 0;
   }
 
-  /**
-   * This method ensures all of our book keeping regarding partial results is kept up to date. This
-   * method should be called once we know that the results we received back from the RPC request do
-   * not contain errors. We return a list of results that should be added to the cache. In general,
-   * this list will contain all NON-partial results from the input array (unless the client has
-   * specified that they are okay with receiving partial results)
-   * @param resultsFromServer The array of {@link Result}s returned from the server
-   * @param heartbeatMessage Flag indicating whether or not the response received from the server
-   *          represented a complete response, or a heartbeat message that was sent to keep the
-   *          client-server connection alive
-   * @return the list of results that should be added to the cache.
-   * @throws IOException
-   */
-  protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
-      boolean heartbeatMessage) throws IOException {
-    int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
-    List<Result> resultsToAddToCache = new ArrayList<>(resultSize);
-
-    // If the caller has indicated in their scan that they are okay with seeing partial results,
-    // then simply add all results to the list. Note allowPartial and setBatch are not same, we can
-    // return here if allow partials and we will handle batching later.
-    if (scan.getAllowPartialResults()) {
-      addResultsToList(resultsToAddToCache, resultsFromServer, 0,
-        (null == resultsFromServer ? 0 : resultsFromServer.length));
-      return resultsToAddToCache;
-    }
-
-    // If no results were returned it indicates that either we have the all the partial results
-    // necessary to construct the complete result or the server had to send a heartbeat message
-    // to the client to keep the client-server connection alive
-    if (resultsFromServer == null || resultsFromServer.length == 0) {
-      // If this response was an empty heartbeat message, then we have not exhausted the region
-      // and thus there may be more partials server side that still need to be added to the partial
-      // list before we form the complete Result
-      if (!partialResults.isEmpty() && !heartbeatMessage) {
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-
-      return resultsToAddToCache;
-    }
-
-    for(Result result : resultsFromServer) {
-      if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
-        // We have a new row, complete the previous row.
-        resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        clearPartialResults();
-      }
-      Result res = regroupResults(result);
-      if (res != null) {
-        resultsToAddToCache.add(res);
-      }
-      if (!result.mayHaveMoreCellsInRow()) {
-        // We are done for this row
-        if (partialResultsCellSizes > 0) {
-          resultsToAddToCache.add(Result.createCompleteResult(partialResults));
-        }
-        clearPartialResults();
-      }
-    }
-
-
-    return resultsToAddToCache;
-  }
-
-  /**
-   * Add new result to the partial list and return a batched Result if caching size exceed
-   * batching limit.
-   * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
-   * setBatch doesn't mean setAllowPartialResult(true)
-   * @param result The result that we want to add to our list of partial Results
-   * @return the result if we have batch limit and there is one Result can be returned to user, or
-   *         null if we have not.
-   * @throws IOException
-   */
-  private Result regroupResults(final Result result) throws IOException {
-    partialResultsRow = result.getRow();
-    partialResults.add(result);
-    partialResultsCellSizes += result.size();
-    if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
-      Cell[] cells = new Cell[scan.getBatch()];
-      int count = 0;
-      boolean stale = false;
-      while (count < scan.getBatch()) {
-        Result res = partialResults.poll();
-        stale = stale || res.isStale();
-        if (res.size() + count <= scan.getBatch()) {
-          System.arraycopy(res.rawCells(), 0, cells, count, res.size());
-          count += res.size();
-        } else {
-          int len = scan.getBatch() - count;
-          System.arraycopy(res.rawCells(), 0, cells, count, len);
-          Cell[] remainingCells = new Cell[res.size() - len];
-          System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
-          Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
-              res.mayHaveMoreCellsInRow());
-          partialResults.addFirst(remainingRes);
-          count = scan.getBatch();
-        }
-      }
-      partialResultsCellSizes -= scan.getBatch();
-      if (partialResultsCellSizes == 0) {
-        // We have nothing in partialResults, clear the flags to prevent returning empty Result
-        // when next result belongs to the next row.
-        clearPartialResults();
-      }
-      return Result.create(cells, null, stale,
-          partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
-    }
-    return null;
-  }
-
-  /**
-   * Convenience method for clearing the list of partials and resetting the partialResultsRow.
-   */
-  private void clearPartialResults() {
-    partialResults.clear();
-    partialResultsCellSizes = 0;
-    partialResultsRow = null;
-  }
-
-  /**
-   * Helper method for adding results between the indices [start, end) to the outputList
-   * @param outputList the list that results will be added to
-   * @param inputArray the array that results are taken from
-   * @param start beginning index (inclusive)
-   * @param end ending index (exclusive)
-   */
-  private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
-    if (inputArray == null || start < 0 || end > inputArray.length) return;
-
-    for (int i = start; i < end; i++) {
-      outputList.add(inputArray[i]);
-    }
-  }
-
   @Override
   public void close() {
     if (!scanMetricsPublished) writeScanMetrics();
@@ -749,57 +571,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
     return false;
   }
 
-  protected void updateLastCellLoadedToCache(Result result) {
-    if (result.rawCells().length == 0) {
-      return;
-    }
-    this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
-  }
-
-  /**
-   * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
-   * columns.
-   */
-  private int compare(Cell a, Cell b) {
-    CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion()
-        ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
-    int r = comparator.compareRows(a, b);
-    if (r != 0) {
-      return this.scan.isReversed() ? -r : r;
-    }
-    return CellComparator.compareWithoutRow(a, b);
-  }
-
-  private Result filterLoadedCell(Result result) {
-    // we only filter result when last result is partial
-    // so lastCellLoadedToCache and result should have same row key.
-    // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
-    // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
-    if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
-      // The first cell of this result is larger than the last cell of loadcache.
-      // If user do not allow partial result, it must be true.
-      return result;
-    }
-    if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
-      // The last cell of this result is smaller than the last cell of loadcache, skip all.
-      return null;
-    }
-
-    // The first one must not in filtered result, we start at the second.
-    int index = 1;
-    while (index < result.rawCells().length) {
-      if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
-        break;
-      }
-      index++;
-    }
-    Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
-    return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
-  }
-
   protected void initCache() {
     initSyncCache();
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2b75836..3e7cd00 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -316,6 +316,10 @@ public final class ConnectionUtils {
   }
 
   static Result filterCells(Result result, Cell keepCellsAfter) {
+    if (keepCellsAfter == null) {
+      // do not need to filter
+      return result;
+    }
     // not the same row
     if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
       return result;
@@ -410,4 +414,14 @@ public final class ConnectionUtils {
   public static int numberOfIndividualRows(List<Result> results) {
     return (int) results.stream().filter(r -> !r.mayHaveMoreCellsInRow()).count();
   }
+
+  public static ScanResultCache createScanResultCache(Scan scan) {
+    if (scan.getAllowPartialResults()) {
+      return new AllowPartialScanResultCache();
+    } else if (scan.getBatch() > 0) {
+      return new BatchScanResultCache(scan.getBatch());
+    } else {
+      return new CompleteScanResultCache();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 4752d70..f8682ec 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -145,11 +147,11 @@ public class Result implements CellScannable, CellScanner {
   }
 
   public static Result create(List<Cell> cells, Boolean exists, boolean stale,
-      boolean hasMoreCellsInRow) {
+      boolean mayHaveMoreCellsInRow) {
     if (exists != null){
-      return new Result(null, exists, stale, hasMoreCellsInRow);
+      return new Result(null, exists, stale, mayHaveMoreCellsInRow);
     }
-    return new Result(cells.toArray(new Cell[cells.size()]), null, stale, hasMoreCellsInRow);
+    return new Result(cells.toArray(new Cell[cells.size()]), null, stale, mayHaveMoreCellsInRow);
   }
 
   /**
@@ -792,44 +794,42 @@ public class Result implements CellScannable, CellScanner {
    * @throws IOException A complete result cannot be formed because the results in the partial list
    *           come from different rows
    */
-  public static Result createCompleteResult(List<Result> partialResults)
+  public static Result createCompleteResult(Iterable<Result> partialResults)
       throws IOException {
+    if (partialResults == null) {
+      return Result.create(Collections.emptyList(), null, false);
+    }
     List<Cell> cells = new ArrayList<>();
     boolean stale = false;
     byte[] prevRow = null;
     byte[] currentRow = null;
-
-    if (partialResults != null && !partialResults.isEmpty()) {
-      for (int i = 0; i < partialResults.size(); i++) {
-        Result r = partialResults.get(i);
-        currentRow = r.getRow();
-        if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
-          throw new IOException(
-              "Cannot form complete result. Rows of partial results do not match." +
-                  " Partial Results: " + partialResults);
-        }
-
-        // Ensure that all Results except the last one are marked as partials. The last result
-        // may not be marked as a partial because Results are only marked as partials when
-        // the scan on the server side must be stopped due to reaching the maxResultSize.
-        // Visualizing it makes it easier to understand:
-        // maxResultSize: 2 cells
-        // (-x-) represents cell number x in a row
-        // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
-        // How row1 will be returned by the server as partial Results:
-        // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
-        // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
-        // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
-        if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
-          throw new IOException(
-              "Cannot form complete result. Result is missing partial flag. " +
-                  "Partial Results: " + partialResults);
-        }
-        prevRow = currentRow;
-        stale = stale || r.isStale();
-        for (Cell c : r.rawCells()) {
-          cells.add(c);
-        }
+    for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
+      Result r = iter.next();
+      currentRow = r.getRow();
+      if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
+        throw new IOException(
+            "Cannot form complete result. Rows of partial results do not match." +
+                " Partial Results: " + partialResults);
+      }
+      // Ensure that all Results except the last one are marked as partials. The last result
+      // may not be marked as a partial because Results are only marked as partials when
+      // the scan on the server side must be stopped due to reaching the maxResultSize.
+      // Visualizing it makes it easier to understand:
+      // maxResultSize: 2 cells
+      // (-x-) represents cell number x in a row
+      // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+      // How row1 will be returned by the server as partial Results:
+      // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+      // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+      // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+      if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
+        throw new IOException("Cannot form complete result. Result is missing partial flag. " +
+            "Partial Results: " + partialResults);
+      }
+      prevRow = currentRow;
+      stale = stale || r.isStale();
+      for (Cell c : r.rawCells()) {
+        cells.add(c);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
index fc5ba14..3fe43a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -17,14 +17,14 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
+import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -51,10 +51,6 @@ public class TestAllowPartialScanResultCache {
     resultCache = null;
   }
 
-  private static Cell createCell(int key, int cq) {
-    return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
-  }
-
   @Test
   public void test() throws IOException {
     assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
@@ -62,31 +58,34 @@ public class TestAllowPartialScanResultCache {
     assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
       resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
 
-    Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new);
-    Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new);
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
 
     Result[] results1 = resultCache.addAndGet(
       new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
     assertEquals(1, results1.length);
     assertEquals(1, Bytes.toInt(results1[0].getRow()));
     assertEquals(5, results1[0].rawCells().length);
-    IntStream.range(0, 5).forEach(
-      i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+    for (int i = 0; i < 5; i++) {
+      assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
 
     Result[] results2 = resultCache.addAndGet(
       new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
     assertEquals(1, results2.length);
     assertEquals(1, Bytes.toInt(results2[0].getRow()));
     assertEquals(5, results2[0].rawCells().length);
-    IntStream.range(5, 10).forEach(
-      i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+    for (int i = 5; i < 10; i++) {
+      assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
 
-    Result[] results3 = resultCache
-        .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+    Result[] results3 =
+        resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
     assertEquals(1, results3.length);
     assertEquals(2, Bytes.toInt(results3[0].getRow()));
     assertEquals(10, results3[0].rawCells().length);
-    IntStream.range(0, 10).forEach(
-      i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+    for (int i = 0; i < 10; i++) {
+      assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))));
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
new file mode 100644
index 0000000..31a4594
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestBatchScanResultCache {
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private BatchScanResultCache resultCache;
+
+  @Before
+  public void setUp() {
+    resultCache = new BatchScanResultCache(4);
+  }
+
+  @After
+  public void tearDown() {
+    resultCache.clear();
+    resultCache = null;
+  }
+
+  static Cell createCell(byte[] cf, int key, int cq) {
+    return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
+  }
+
+  static Cell[] createCells(byte[] cf, int key, int numCqs) {
+    Cell[] cells = new Cell[numCqs];
+    for (int i = 0; i < numCqs; i++) {
+      cells[i] = createCell(cf, key, i);
+    }
+    return cells;
+  }
+
+  private void assertResultEquals(Result result, int key, int start, int to) {
+    assertEquals(to - start, result.size());
+    for (int i = start; i < to; i++) {
+      assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i))));
+    }
+    assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
+  }
+
+  @Test
+  public void test() throws IOException {
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+    assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+      resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+    Cell[] cells1 = createCells(CF, 1, 10);
+    Cell[] cells2 = createCells(CF, 2, 10);
+    Cell[] cells3 = createCells(CF, 3, 10);
+    assertEquals(0, resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
+    Result[] results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
+          Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
+      false);
+    assertEquals(2, results.length);
+    assertResultEquals(results[0], 1, 0, 4);
+    assertResultEquals(results[1], 1, 4, 8);
+    results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+    assertEquals(1, results.length);
+    assertResultEquals(results[0], 1, 8, 10);
+
+    results = resultCache.addAndGet(
+      new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
+          Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
+          Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
+      false);
+    assertEquals(6, results.length);
+    assertResultEquals(results[0], 2, 0, 4);
+    assertResultEquals(results[1], 2, 4, 8);
+    assertResultEquals(results[2], 2, 8, 10);
+    assertResultEquals(results[3], 3, 0, 4);
+    assertResultEquals(results[4], 3, 4, 8);
+    assertResultEquals(results[5], 3, 8, 10);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
index a340e9f..8759593 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertSame;
 
 import java.io.IOException;
 import java.util.Arrays;
-import java.util.stream.IntStream;
 
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
@@ -70,9 +69,9 @@ public class TestCompleteResultScanResultCache {
       resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
     int count = 10;
     Result[] results = new Result[count];
-    IntStream.range(0, count).forEach(i -> {
+    for (int i = 0; i < count; i++) {
       results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
-    });
+    }
     assertSame(results, resultCache.addAndGet(results, false));
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
new file mode 100644
index 0000000..2a32206
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRawAsyncTablePartialScan {
+
+  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("async");
+
+  private static byte[] FAMILY = Bytes.toBytes("cf");
+
+  private static byte[][] CQS =
+      new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3") };
+
+  private static int COUNT = 100;
+
+  private static AsyncConnection CONN;
+
+  private static RawAsyncTable TABLE;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+    TEST_UTIL.waitTableAvailable(TABLE_NAME);
+    CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+    TABLE = CONN.getRawTable(TABLE_NAME);
+    TABLE
+        .putAll(IntStream.range(0, COUNT)
+            .mapToObj(i -> new Put(Bytes.toBytes(String.format("%02d", i)))
+                .addColumn(FAMILY, CQS[0], Bytes.toBytes(i))
+                .addColumn(FAMILY, CQS[1], Bytes.toBytes(2 * i))
+                .addColumn(FAMILY, CQS[2], Bytes.toBytes(3 * i)))
+            .collect(Collectors.toList()))
+        .get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    CONN.close();
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testBatchDoNotAllowPartial() throws InterruptedException, ExecutionException {
+    // we set batch to 2 and max result size to 1, then server will only returns one result per call
+    // but we should get 2 + 1 for every row.
+    List<Result> results = TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1)).get();
+    assertEquals(2 * COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      Result firstTwo = results.get(2 * i);
+      assertEquals(String.format("%02d", i), Bytes.toString(firstTwo.getRow()));
+      assertEquals(2, firstTwo.size());
+      assertEquals(i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+      assertEquals(2 * i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+      Result secondOne = results.get(2 * i + 1);
+      assertEquals(String.format("%02d", i), Bytes.toString(secondOne.getRow()));
+      assertEquals(1, secondOne.size());
+      assertEquals(3 * i, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+    }
+  }
+
+  @Test
+  public void testReversedBatchDoNotAllowPartial() throws InterruptedException, ExecutionException {
+    // we set batch to 2 and max result size to 1, then server will only returns one result per call
+    // but we should get 2 + 1 for every row.
+    List<Result> results =
+        TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get();
+    assertEquals(2 * COUNT, results.size());
+    for (int i = 0; i < COUNT; i++) {
+      int row = COUNT - i - 1;
+      Result firstTwo = results.get(2 * i);
+      assertEquals(String.format("%02d", row), Bytes.toString(firstTwo.getRow()));
+      assertEquals(2, firstTwo.size());
+      assertEquals(row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+      assertEquals(2 * row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+      Result secondOne = results.get(2 * i + 1);
+      assertEquals(String.format("%02d", row), Bytes.toString(secondOne.getRow()));
+      assertEquals(1, secondOne.size());
+      assertEquals(3 * row, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+    }
+  }
+}


[05/14] hbase git commit: HBASE-17747 Support both weak and soft object pool

Posted by sy...@apache.org.
HBASE-17747 Support both weak and soft object pool


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44b25588
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44b25588
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44b25588

Branch: refs/heads/hbase-12439
Commit: 44b255889cfb168aaac8adc162f740beb61a7221
Parents: 201c838
Author: Yu Li <li...@apache.org>
Authored: Tue Mar 14 11:07:52 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Tue Mar 14 11:07:52 2017 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/util/KeyLocker.java |   2 +-
 .../apache/hadoop/hbase/util/ObjectPool.java    | 174 +++++++++++++++++++
 .../hadoop/hbase/util/SoftObjectPool.java       |  81 +++++++++
 .../hadoop/hbase/util/WeakObjectPool.java       | 151 ++--------------
 .../hadoop/hbase/util/TestWeakObjectPool.java   |   4 +-
 .../hadoop/hbase/util/IdReadWriteLock.java      |   9 +-
 .../hadoop/hbase/util/TestIdReadWriteLock.java  |   5 +-
 7 files changed, 285 insertions(+), 141 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
index 6acf584..57e7bb0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
@@ -50,7 +50,7 @@ public class KeyLocker<K> {
 
   private final WeakObjectPool<K, ReentrantLock> lockPool =
       new WeakObjectPool<>(
-          new WeakObjectPool.ObjectFactory<K, ReentrantLock>() {
+          new ObjectPool.ObjectFactory<K, ReentrantLock>() {
             @Override
             public ReentrantLock createObject(K key) {
               return new ReentrantLock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
new file mode 100644
index 0000000..f736922
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
+ * objects may be excessively created and discarded.
+ */
+@InterfaceAudience.Private
+public abstract class ObjectPool<K, V> {
+  /**
+   * An {@code ObjectFactory} object is used to create
+   * new shared objects on demand.
+   */
+  public interface ObjectFactory<K, V> {
+    /**
+     * Creates a new shared object associated with the given {@code key},
+     * identified by the {@code equals} method.
+     * This method may be simultaneously called by multiple threads
+     * with the same key, and the excessive objects are just discarded.
+     */
+    V createObject(K key);
+  }
+
+  protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
+
+  private final ObjectFactory<K, V> objectFactory;
+
+  /** Does not permit null keys. */
+  protected final ConcurrentMap<K, Reference<V>> referenceCache;
+
+  /**
+   * The default initial capacity,
+   * used when not otherwise specified in a constructor.
+   */
+  public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+  /**
+   * The default concurrency level,
+   * used when not otherwise specified in a constructor.
+   */
+  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+
+  /**
+   * Creates a new pool with the default initial capacity (16)
+   * and the default concurrency level (16).
+   *
+   * @param objectFactory the factory to supply new objects on demand
+   *
+   * @throws NullPointerException if {@code objectFactory} is null
+   */
+  public ObjectPool(ObjectFactory<K, V> objectFactory) {
+    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
+  }
+
+  /**
+   * Creates a new pool with the given initial capacity
+   * and the default concurrency level (16).
+   *
+   * @param objectFactory the factory to supply new objects on demand
+   * @param initialCapacity the initial capacity to keep objects in the pool
+   *
+   * @throws NullPointerException if {@code objectFactory} is null
+   * @throws IllegalArgumentException if {@code initialCapacity} is negative
+   */
+  public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
+    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
+  }
+
+  /**
+   * Creates a new pool with the given initial capacity
+   * and the given concurrency level.
+   *
+   * @param objectFactory the factory to supply new objects on demand
+   * @param initialCapacity the initial capacity to keep objects in the pool
+   * @param concurrencyLevel the estimated count of concurrently accessing threads
+   *
+   * @throws NullPointerException if {@code objectFactory} is null
+   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
+   *    {@code concurrencyLevel} is non-positive
+   */
+  public ObjectPool(
+      ObjectFactory<K, V> objectFactory,
+      int initialCapacity,
+      int concurrencyLevel) {
+
+    if (objectFactory == null) {
+      throw new NullPointerException("Given object factory instance is NULL");
+    }
+    this.objectFactory = objectFactory;
+
+    this.referenceCache =
+        new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
+  }
+
+  /**
+   * Removes stale references of shared objects from the pool.
+   * References newly becoming stale may still remain.
+   * The implementation of this method is expected to be lightweight
+   * when there is no stale reference.
+   */
+  public abstract void purge();
+
+  /**
+   * Create a reference associated with the given object
+   * @param key the key to store in the reference
+   * @param obj the object to associate with
+   * @return the reference instance
+   */
+  public abstract Reference<V> createReference(K key, V obj);
+
+  /**
+   * Returns a shared object associated with the given {@code key},
+   * which is identified by the {@code equals} method.
+   * @throws NullPointerException if {@code key} is null
+   */
+  public V get(K key) {
+    Reference<V> ref = referenceCache.get(key);
+    if (ref != null) {
+      V obj = ref.get();
+      if (obj != null) {
+        return obj;
+      }
+      referenceCache.remove(key, ref);
+    }
+
+    V newObj = objectFactory.createObject(key);
+    Reference<V> newRef = createReference(key, newObj);
+    while (true) {
+      Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
+      if (existingRef == null) {
+        return newObj;
+      }
+
+      V existingObject = existingRef.get();
+      if (existingObject != null) {
+        return existingObject;
+      }
+      referenceCache.remove(key, existingRef);
+    }
+  }
+
+  /**
+   * Returns an estimated count of objects kept in the pool.
+   * This also counts stale references,
+   * and you might want to call {@link #purge()} beforehand.
+   */
+  public int size() {
+    return referenceCache.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
new file mode 100644
index 0000000..7f27f98
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
+
+/**
+ * A {@code SoftReference} based shared object pool.
+ * The objects are kept in soft references and
+ * associated with keys which are identified by the {@code equals} method.
+ * The objects are created by {@link ObjectFactory} on demand.
+ * The object creation is expected to be lightweight,
+ * and the objects may be excessively created and discarded.
+ * Thread safe.
+ */
+@InterfaceAudience.Private
+public class SoftObjectPool<K, V> extends ObjectPool<K, V> {
+
+  public SoftObjectPool(ObjectFactory<K, V> objectFactory) {
+    super(objectFactory);
+  }
+
+  public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
+    super(objectFactory, initialCapacity);
+  }
+
+  public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
+      int concurrencyLevel) {
+    super(objectFactory, initialCapacity, concurrencyLevel);
+  }
+
+  @Override
+  public void purge() {
+    // This method is lightweight while there is no stale reference
+    // with the Oracle (Sun) implementation of {@code ReferenceQueue},
+    // because {@code ReferenceQueue.poll} just checks a volatile instance
+    // variable in {@code ReferenceQueue}.
+    while (true) {
+      @SuppressWarnings("unchecked")
+      SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll();
+      if (ref == null) {
+        break;
+      }
+      referenceCache.remove(ref.key, ref);
+    }
+  }
+
+  @Override
+  public Reference<V> createReference(K key, V obj) {
+    return new SoftObjectReference(key, obj);
+  }
+
+  private class SoftObjectReference extends SoftReference<V> {
+    final K key;
+
+    SoftObjectReference(K key, V obj) {
+      super(obj, staleRefQueue);
+      this.key = key;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
index 478864b..8529f01 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
@@ -1,4 +1,4 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -15,15 +15,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.hadoop.hbase.util;
 
-import java.lang.ref.ReferenceQueue;
+import java.lang.ref.Reference;
 import java.lang.ref.WeakReference;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
 
 /**
  * A {@code WeakReference} based shared object pool.
@@ -35,116 +33,30 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  * Thread safe.
  */
 @InterfaceAudience.Private
-public class WeakObjectPool<K, V> {
-  /**
-   * An {@code ObjectFactory} object is used to create
-   * new shared objects on demand.
-   */
-  public interface ObjectFactory<K, V> {
-    /**
-     * Creates a new shared object associated with the given {@code key},
-     * identified by the {@code equals} method.
-     * This method may be simultaneously called by multiple threads
-     * with the same key, and the excessive objects are just discarded.
-     */
-    V createObject(K key);
-  }
-
-  private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
-
-  private class ObjectReference extends WeakReference<V> {
-    final K key;
-
-    ObjectReference(K key, V obj) {
-      super(obj, staleRefQueue);
-      this.key = key;
-    }
-  }
-
-  private final ObjectFactory<K, V> objectFactory;
-
-  /** Does not permit null keys. */
-  private final ConcurrentMap<K, ObjectReference> referenceCache;
-
-  /**
-   * The default initial capacity,
-   * used when not otherwise specified in a constructor.
-   */
-  public static final int DEFAULT_INITIAL_CAPACITY = 16;
-
-  /**
-   * The default concurrency level,
-   * used when not otherwise specified in a constructor.
-   */
-  public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+public class WeakObjectPool<K,V> extends ObjectPool<K,V> {
 
-  /**
-   * Creates a new pool with the default initial capacity (16)
-   * and the default concurrency level (16).
-   *
-   * @param objectFactory the factory to supply new objects on demand
-   *
-   * @throws NullPointerException if {@code objectFactory} is null
-   */
   public WeakObjectPool(ObjectFactory<K, V> objectFactory) {
-    this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
+    super(objectFactory);
   }
 
-  /**
-   * Creates a new pool with the given initial capacity
-   * and the default concurrency level (16).
-   *
-   * @param objectFactory the factory to supply new objects on demand
-   * @param initialCapacity the initial capacity to keep objects in the pool
-   *
-   * @throws NullPointerException if {@code objectFactory} is null
-   * @throws IllegalArgumentException if {@code initialCapacity} is negative
-   */
   public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
-    this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
+    super(objectFactory, initialCapacity);
   }
 
-  /**
-   * Creates a new pool with the given initial capacity
-   * and the given concurrency level.
-   *
-   * @param objectFactory the factory to supply new objects on demand
-   * @param initialCapacity the initial capacity to keep objects in the pool
-   * @param concurrencyLevel the estimated count of concurrently accessing threads
-   *
-   * @throws NullPointerException if {@code objectFactory} is null
-   * @throws IllegalArgumentException if {@code initialCapacity} is negative or
-   *    {@code concurrencyLevel} is non-positive
-   */
-  public WeakObjectPool(
-      ObjectFactory<K, V> objectFactory,
-      int initialCapacity,
+  public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
       int concurrencyLevel) {
-
-    if (objectFactory == null) {
-      throw new NullPointerException();
-    }
-    this.objectFactory = objectFactory;
-
-    this.referenceCache = new ConcurrentHashMap<>(initialCapacity, 0.75f, concurrencyLevel);
-    // 0.75f is the default load factor threshold of ConcurrentHashMap.
+    super(objectFactory, initialCapacity, concurrencyLevel);
   }
 
-  /**
-   * Removes stale references of shared objects from the pool.
-   * References newly becoming stale may still remain.
-   * The implementation of this method is expected to be lightweight
-   * when there is no stale reference.
-   */
+  @Override
   public void purge() {
     // This method is lightweight while there is no stale reference
     // with the Oracle (Sun) implementation of {@code ReferenceQueue},
     // because {@code ReferenceQueue.poll} just checks a volatile instance
     // variable in {@code ReferenceQueue}.
-
     while (true) {
       @SuppressWarnings("unchecked")
-      ObjectReference ref = (ObjectReference)staleRefQueue.poll();
+      WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll();
       if (ref == null) {
         break;
       }
@@ -152,43 +64,18 @@ public class WeakObjectPool<K, V> {
     }
   }
 
-  /**
-   * Returns a shared object associated with the given {@code key},
-   * which is identified by the {@code equals} method.
-   * @throws NullPointerException if {@code key} is null
-   */
-  public V get(K key) {
-    ObjectReference ref = referenceCache.get(key);
-    if (ref != null) {
-      V obj = ref.get();
-      if (obj != null) {
-        return obj;
-      }
-      referenceCache.remove(key, ref);
-    }
+  @Override
+  public Reference<V> createReference(K key, V obj) {
+    return new WeakObjectReference(key, obj);
+  }
 
-    V newObj = objectFactory.createObject(key);
-    ObjectReference newRef = new ObjectReference(key, newObj);
-    while (true) {
-      ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef);
-      if (existingRef == null) {
-        return newObj;
-      }
+  private class WeakObjectReference extends WeakReference<V> {
+    final K key;
 
-      V existingObject = existingRef.get();
-      if (existingObject != null) {
-        return existingObject;
-      }
-      referenceCache.remove(key, existingRef);
+    WeakObjectReference(K key, V obj) {
+      super(obj, staleRefQueue);
+      this.key = key;
     }
   }
 
-  /**
-   * Returns an estimated count of objects kept in the pool.
-   * This also counts stale references,
-   * and you might want to call {@link #purge()} beforehand.
-   */
-  public int size() {
-    return referenceCache.size();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
index d9fefa2..9dbbbd0 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
@@ -31,12 +31,12 @@ import org.junit.experimental.categories.Category;
 
 @Category({MiscTests.class, SmallTests.class})
 public class TestWeakObjectPool {
-  WeakObjectPool<String, Object> pool;
+  ObjectPool<String, Object> pool;
 
   @Before
   public void setUp() {
     pool = new WeakObjectPool<>(
-        new WeakObjectPool.ObjectFactory<String, Object>() {
+        new ObjectPool.ObjectFactory<String, Object>() {
           @Override
           public Object createObject(String key) {
             return new Object();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
index caf3265..deb2265 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -44,10 +44,11 @@ import com.google.common.annotations.VisibleForTesting;
 public class IdReadWriteLock {
   // The number of lock we want to easily support. It's not a maximum.
   private static final int NB_CONCURRENT_LOCKS = 1000;
-  // The pool to get entry from, entries are mapped by weak reference to make it able to be
-  // garbage-collected asap
-  private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool = new WeakObjectPool<>(
-          new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+  // The pool to get entry from, entries are mapped by soft reference and will be
+  // automatically garbage-collected when JVM memory pressure is high
+  private final ObjectPool<Long, ReentrantReadWriteLock> lockPool =
+      new SoftObjectPool<>(
+          new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
             @Override
             public ReentrantReadWriteLock createObject(Long id) {
               return new ReentrantReadWriteLock();

http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
index 2ccfad8..295816f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -111,10 +111,11 @@ public class TestIdReadWriteLock {
         Future<Boolean> result = ecs.take();
         assertTrue(result.get());
       }
-      // make sure the entry pool will be cleared after GC and purge call
+      // make sure the entry pool won't be cleared when JVM memory is enough
+      // even after GC and purge call
       int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
       LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
-      assertEquals(0, entryPoolSize);
+      assertEquals(NUM_IDS, entryPoolSize);
     } finally {
       exec.shutdown();
       exec.awaitTermination(5000, TimeUnit.MILLISECONDS);


[08/14] hbase git commit: HBASE-17782 Extend IdReadWriteLock to support using both weak and soft reference

Posted by sy...@apache.org.
HBASE-17782 Extend IdReadWriteLock to support using both weak and soft reference


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aace02a2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aace02a2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aace02a2

Branch: refs/heads/hbase-12439
Commit: aace02a230a61cc7e91eb240598435c36c9af403
Parents: 14fb57c
Author: Yu Li <li...@apache.org>
Authored: Wed Mar 15 11:07:42 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Mar 15 11:07:42 2017 +0800

----------------------------------------------------------------------
 .../hbase/io/hfile/bucket/BucketCache.java      |  5 +-
 .../hadoop/hbase/util/IdReadWriteLock.java      | 58 ++++++++++++++++----
 .../hbase/wal/RegionGroupingProvider.java       | 13 +++--
 .../hadoop/hbase/util/TestIdReadWriteLock.java  | 31 +++++++++--
 4 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index cb23ca9..3e9c376 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.HasThread;
 import org.apache.hadoop.hbase.util.IdReadWriteLock;
+import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -185,9 +186,11 @@ public class BucketCache implements BlockCache, HeapSize {
   /**
    * A ReentrantReadWriteLock to lock on a particular block identified by offset.
    * The purpose of this is to avoid freeing the block which is being read.
+   * <p>
+   * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
    */
   @VisibleForTesting
-  final IdReadWriteLock offsetLock = new IdReadWriteLock();
+  final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT);
 
   private final NavigableSet<BlockCacheKey> blocksByHFile =
       new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {

http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
index deb2265..2a83029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.util;
 
+import java.lang.ref.Reference;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -44,16 +45,48 @@ import com.google.common.annotations.VisibleForTesting;
 public class IdReadWriteLock {
   // The number of lock we want to easily support. It's not a maximum.
   private static final int NB_CONCURRENT_LOCKS = 1000;
-  // The pool to get entry from, entries are mapped by soft reference and will be
-  // automatically garbage-collected when JVM memory pressure is high
-  private final ObjectPool<Long, ReentrantReadWriteLock> lockPool =
-      new SoftObjectPool<>(
-          new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
-            @Override
-            public ReentrantReadWriteLock createObject(Long id) {
-              return new ReentrantReadWriteLock();
-            }
-          }, NB_CONCURRENT_LOCKS);
+  /**
+   * The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
+   * garbage-collected by JVM
+   */
+  private final ObjectPool<Long, ReentrantReadWriteLock> lockPool;
+  private final ReferenceType refType;
+
+  public IdReadWriteLock() {
+    this(ReferenceType.WEAK);
+  }
+
+  /**
+   * Constructor of IdReadWriteLock
+   * @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
+   *          default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
+   *          be reused with a high frequency
+   */
+  public IdReadWriteLock(ReferenceType referenceType) {
+    this.refType = referenceType;
+    switch (referenceType) {
+    case SOFT:
+      lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+        @Override
+        public ReentrantReadWriteLock createObject(Long id) {
+          return new ReentrantReadWriteLock();
+        }
+      }, NB_CONCURRENT_LOCKS);
+      break;
+    case WEAK:
+    default:
+      lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+        @Override
+        public ReentrantReadWriteLock createObject(Long id) {
+          return new ReentrantReadWriteLock();
+        }
+      }, NB_CONCURRENT_LOCKS);
+    }
+  }
+
+  public static enum ReferenceType {
+    WEAK, SOFT
+  }
 
   /**
    * Get the ReentrantReadWriteLock corresponding to the given id
@@ -93,4 +126,9 @@ public class IdReadWriteLock {
       Thread.sleep(50);
     }
   }
+
+  @VisibleForTesting
+  public ReferenceType getReferenceType() {
+    return this.refType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index dee36e8..5a29731 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -27,7 +27,6 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -36,7 +35,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
 // imports for classes still in regionserver.wal
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.IdReadWriteLock;
+import org.apache.hadoop.hbase.util.IdLock;
 
 /**
  * A WAL Provider that returns a WAL per group of regions.
@@ -132,7 +131,7 @@ public class RegionGroupingProvider implements WALProvider {
   /** A group-provider mapping, make sure one-one rather than many-one mapping */
   private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
 
-  private final IdReadWriteLock createLock = new IdReadWriteLock();
+  private final IdLock createLock = new IdLock();
 
   private RegionGroupingStrategy strategy = null;
   private WALFactory factory = null;
@@ -181,16 +180,18 @@ public class RegionGroupingProvider implements WALProvider {
   private WAL getWAL(final String group) throws IOException {
     WALProvider provider = cached.get(group);
     if (provider == null) {
-      Lock lock = createLock.getLock(group.hashCode()).writeLock();
-      lock.lock();
+      IdLock.Entry lockEntry = null;
       try {
+        lockEntry = createLock.getLockEntry(group.hashCode());
         provider = cached.get(group);
         if (provider == null) {
           provider = createProvider(group);
           cached.put(group, provider);
         }
       } finally {
-        lock.unlock();
+        if (lockEntry != null) {
+          createLock.releaseLockEntry(lockEntry);
+        }
       }
     }
     return provider.getWAL(null, null);

http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
index 295816f..7dd2a63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 
+import java.util.Arrays;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.Callable;
@@ -38,9 +39,13 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+@RunWith(Parameterized.class)
 @Category({MiscTests.class, MediumTests.class})
 // Medium as it creates 100 threads; seems better to run it isolated
 public class TestIdReadWriteLock {
@@ -51,7 +56,14 @@ public class TestIdReadWriteLock {
   private static final int NUM_THREADS = 128;
   private static final int NUM_SECONDS = 15;
 
-  private IdReadWriteLock idLock = new IdReadWriteLock();
+  @Parameterized.Parameter
+  public IdReadWriteLock idLock;
+
+  @Parameterized.Parameters
+  public static Iterable<Object[]> data() {
+    return Arrays.asList(new Object[][] { { new IdReadWriteLock(ReferenceType.WEAK) },
+        { new IdReadWriteLock(ReferenceType.SOFT) } });
+  }
 
   private Map<Long, String> idOwner = new ConcurrentHashMap<>();
 
@@ -111,11 +123,22 @@ public class TestIdReadWriteLock {
         Future<Boolean> result = ecs.take();
         assertTrue(result.get());
       }
-      // make sure the entry pool won't be cleared when JVM memory is enough
-      // even after GC and purge call
       int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
       LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
-      assertEquals(NUM_IDS, entryPoolSize);
+      ReferenceType refType = idLock.getReferenceType();
+      switch (refType) {
+      case WEAK:
+        // make sure the entry pool will be cleared after GC and purge call
+        assertEquals(0, entryPoolSize);
+        break;
+      case SOFT:
+        // make sure the entry pool won't be cleared when JVM memory is enough
+        // even after GC and purge call
+        assertEquals(NUM_IDS, entryPoolSize);
+        break;
+      default:
+        break;
+      }
     } finally {
       exec.shutdown();
       exec.awaitTermination(5000, TimeUnit.MILLISECONDS);


[13/14] hbase git commit: HBASE-17790 Mark ReplicationAdmin's peerAdded and listReplicationPeers as Deprecated

Posted by sy...@apache.org.
HBASE-17790 Mark ReplicationAdmin's peerAdded and listReplicationPeers as Deprecated


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6a6fff10
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6a6fff10
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6a6fff10

Branch: refs/heads/hbase-12439
Commit: 6a6fff103e0fcadfd539fbbae5157a99643a033b
Parents: 53e9a1c
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Mar 16 16:55:18 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Mar 16 16:55:18 2017 +0800

----------------------------------------------------------------------
 .../client/replication/ReplicationAdmin.java    |  5 ++++
 .../replication/TestReplicationAdmin.java       | 25 ++++++++++----------
 .../TestReplicationAdminWithClusters.java       |  3 +--
 .../replication/TestNamespaceReplication.java   |  1 -
 .../hbase/replication/TestReplicationBase.java  |  4 +++-
 5 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index c7f040e..0eae10b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -521,11 +521,16 @@ public class ReplicationAdmin implements Closeable {
   }
 
   @VisibleForTesting
+  @Deprecated
   public void peerAdded(String id) throws ReplicationException {
     this.replicationPeers.peerConnected(id);
   }
 
+  /**
+   * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
+   */
   @VisibleForTesting
+  @Deprecated
   List<ReplicationPeer> listReplicationPeers() throws IOException {
     Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
     if (peers == null || peers.size() <= 0) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index f092a48..a23b76a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
 import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -74,6 +76,7 @@ public class TestReplicationAdmin {
   private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
 
   private static ReplicationAdmin admin;
+  private static Admin hbaseAdmin;
 
   @Rule
   public TestName name = new TestName();
@@ -87,6 +90,7 @@ public class TestReplicationAdmin {
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
     admin = new ReplicationAdmin(conf);
+    hbaseAdmin = TEST_UTIL.getAdmin();
   }
 
   @AfterClass
@@ -149,16 +153,16 @@ public class TestReplicationAdmin {
     config.setClusterKey(KEY_ONE);
     config.getConfiguration().put("key1", "value1");
     config.getConfiguration().put("key2", "value2");
-    admin.addPeer(ID_ONE, config, null);
+    hbaseAdmin.addReplicationPeer(ID_ONE, config);
 
-    List<ReplicationPeer> peers = admin.listReplicationPeers();
+    List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
     assertEquals(1, peers.size());
-    ReplicationPeer peerOne = peers.get(0);
+    ReplicationPeerDescription peerOne = peers.get(0);
     assertNotNull(peerOne);
-    assertEquals("value1", peerOne.getConfiguration().get("key1"));
-    assertEquals("value2", peerOne.getConfiguration().get("key2"));
+    assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
+    assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
 
-    admin.removePeer(ID_ONE);
+    hbaseAdmin.removeReplicationPeer(ID_ONE);
   }
 
   @Test
@@ -403,8 +407,7 @@ public class TestReplicationAdmin {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
-    admin.addPeer(ID_ONE, rpc);
-    admin.peerAdded(ID_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
     rpc = admin.getPeerConfig(ID_ONE);
     Set<String> namespaces = new HashSet<>();
@@ -438,8 +441,7 @@ public class TestReplicationAdmin {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
-    admin.addPeer(ID_ONE, rpc);
-    admin.peerAdded(ID_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
     rpc = admin.getPeerConfig(ID_ONE);
     Set<String> namespaces = new HashSet<String>();
@@ -482,8 +484,7 @@ public class TestReplicationAdmin {
   public void testPeerBandwidth() throws Exception {
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(KEY_ONE);
-    admin.addPeer(ID_ONE, rpc);
-    admin.peerAdded(ID_ONE);
+    hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
 
     rpc = admin.getPeerConfig(ID_ONE);
     assertEquals(0, rpc.getBandwidth());

http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 312a90a..56f4141 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -229,8 +229,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
     rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
     rpc.getConfiguration().put("key1", "value1");
 
-    admin.addPeer(peerId, rpc);
-    admin.peerAdded(peerId);
+    admin1.addReplicationPeer(peerId, rpc);
 
     rpc.getConfiguration().put("key1", "value2");
     admin.updatePeerConfig(peerId, rpc);

http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
index e296f87..433a345 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -140,7 +140,6 @@ public class TestNamespaceReplication extends TestReplicationBase {
     Table htab1B = connection1.getTable(tabBName);
     Table htab2B = connection2.getTable(tabBName);
 
-    admin.peerAdded("2");
     // add ns1 to peer config which replicate to cluster2
     ReplicationPeerConfig rpc = admin.getPeerConfig("2");
     Set<String> namespaces = new HashSet<>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index caad544..81fe629 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -63,6 +63,7 @@ public class TestReplicationBase {
   protected static ZooKeeperWatcher zkw2;
 
   protected static ReplicationAdmin admin;
+  private static Admin hbaseAdmin;
 
   protected static Table htable1;
   protected static Table htable2;
@@ -133,7 +134,8 @@ public class TestReplicationBase {
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(utility2.getClusterKey());
-    admin.addPeer("2", rpc, null);
+    hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
+    hbaseAdmin.addReplicationPeer("2", rpc);
 
     HTableDescriptor table = new HTableDescriptor(tableName);
     HColumnDescriptor fam = new HColumnDescriptor(famName);


[04/14] hbase git commit: guard against NPE while reading FileTrailer and HFileBlock

Posted by sy...@apache.org.
guard against NPE while reading FileTrailer and HFileBlock

guard against NPE from FSInputStream#seek

Signed-off-by: Michael Stack <st...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/201c8382
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/201c8382
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/201c8382

Branch: refs/heads/hbase-12439
Commit: 201c8382508da1266d11e04d3c7cbef42e0a256a
Parents: 35d7a0c
Author: James Moore <jc...@hubspot.com>
Authored: Fri Feb 24 10:26:12 2017 -0500
Committer: Michael Stack <st...@apache.org>
Committed: Mon Mar 13 14:53:35 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/io/hfile/FixedFileTrailer.java |  3 +-
 .../hadoop/hbase/io/hfile/HFileBlock.java       |  2 +-
 .../apache/hadoop/hbase/io/hfile/HFileUtil.java | 43 ++++++++++++++++++++
 3 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index 7eac9c6..1854236 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -388,7 +388,8 @@ public class FixedFileTrailer {
       bufferSize = (int) fileSize;
     }
 
-    istream.seek(seekPoint);
+    HFileUtil.seekOnMultipleSources(istream, seekPoint);
+
     ByteBuffer buf = ByteBuffer.allocate(bufferSize);
     istream.readFully(buf.array(), buf.arrayOffset(),
         buf.arrayOffset() + buf.limit());

http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index fba15ba..0b140b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1512,7 +1512,7 @@ public class HFileBlock implements Cacheable {
       if (!pread && streamLock.tryLock()) {
         // Seek + read. Better for scanning.
         try {
-          istream.seek(fileOffset);
+          HFileUtil.seekOnMultipleSources(istream, fileOffset);
 
           long realOffset = istream.getPos();
           if (realOffset != fileOffset) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
new file mode 100644
index 0000000..835450c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HFileUtil {
+
+  /** guards against NullPointer
+   * utility which tries to seek on the DFSIS and will try an alternative source
+   * if the FSDataInputStream throws an NPE HBASE-17501
+   * @param istream
+   * @param offset
+   * @throws IOException
+   */
+  static public void seekOnMultipleSources(FSDataInputStream istream, long offset) throws IOException {
+    try {
+      // attempt to seek inside of current blockReader
+      istream.seek(offset);
+    } catch (NullPointerException e) {
+      // retry the seek on an alternate copy of the data
+      // this can occur if the blockReader on the DFSInputStream is null
+      istream.seekToNewSource(offset);
+    }
+  }
+}


[10/14] hbase git commit: HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the cache is one

Posted by sy...@apache.org.
HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the cache is one


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ecb6782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ecb6782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ecb6782

Branch: refs/heads/hbase-12439
Commit: 0ecb6782593039af75a45c25481f1dbf7cbd6928
Parents: a49bc58
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Mar 12 13:48:12 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Thu Mar 16 03:07:20 2017 +0800

----------------------------------------------------------------------
 .../client/ClientAsyncPrefetchScanner.java      | 61 +++++++++++-------
 .../client/TestScannersFromClientSide.java      | 66 ++++++++++++++------
 2 files changed, 88 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index b1fc2da..007e638 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import com.google.common.annotations.VisibleForTesting;
 import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
 
 import java.io.IOException;
@@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.TableName;
@@ -62,6 +64,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
   private AtomicBoolean prefetchRunning;
   // an attribute for synchronizing close between scanner and prefetch threads
   private AtomicLong closingThreadId;
+  // used for testing
+  private Consumer<Boolean> prefetchListener;
   private static final int NO_THREAD = -1;
 
   public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
@@ -72,6 +76,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
         replicaCallTimeoutMicroSecondScan);
   }
 
+  @VisibleForTesting
+  void setPrefetchListener(Consumer<Boolean> prefetchListener) {
+    this.prefetchListener = prefetchListener;
+  }
+
   @Override
   protected void initCache() {
     // concurrent cache
@@ -88,34 +97,39 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
   public Result next() throws IOException {
 
     try {
-      handleException();
+      boolean hasExecutedPrefetch = false;
+      do {
+        handleException();
 
-      // If the scanner is closed and there's nothing left in the cache, next is a no-op.
-      if (getCacheCount() == 0 && this.closed) {
-        return null;
-      }
-      if (prefetchCondition()) {
-        // run prefetch in the background only if no prefetch is already running
-        if (!isPrefetchRunning()) {
-          if (prefetchRunning.compareAndSet(false, true)) {
-            getPool().execute(prefetchRunnable);
+        // If the scanner is closed and there's nothing left in the cache, next is a no-op.
+        if (getCacheCount() == 0 && this.closed) {
+          return null;
+        }
+
+        if (prefetchCondition()) {
+          // run prefetch in the background only if no prefetch is already running
+          if (!isPrefetchRunning()) {
+            if (prefetchRunning.compareAndSet(false, true)) {
+              getPool().execute(prefetchRunnable);
+              hasExecutedPrefetch = true;
+            }
+          }
+        }
+
+        while (isPrefetchRunning()) {
+          // prefetch running or still pending
+          if (getCacheCount() > 0) {
+            return pollCache();
+          } else {
+            // (busy) wait for a record - sleep
+            Threads.sleep(1);
           }
         }
-      }
 
-      while (isPrefetchRunning()) {
-        // prefetch running or still pending
         if (getCacheCount() > 0) {
           return pollCache();
-        } else {
-          // (busy) wait for a record - sleep
-          Threads.sleep(1);
         }
-      }
-
-      if (getCacheCount() > 0) {
-        return pollCache();
-      }
+      } while (!hasExecutedPrefetch);
 
       // if we exhausted this scanner before calling close, write out the scan metrics
       writeScanMetrics();
@@ -219,11 +233,16 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
 
     @Override
     public void run() {
+      boolean succeed = false;
       try {
         loadCache();
+        succeed = true;
       } catch (Exception e) {
         exceptionsQueue.add(e);
       } finally {
+        if (prefetchListener != null) {
+          prefetchListener.accept(succeed);
+        }
         prefetchRunning.set(false);
         if(closed) {
           if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 6f40093..e5c19ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
 import java.util.stream.IntStream;
 
 import org.apache.commons.logging.Log;
@@ -656,7 +657,9 @@ public class TestScannersFromClientSide {
     testAsyncScanner(TableName.valueOf(name.getMethodName()),
       2,
       3,
-      10);
+      10,
+      -1,
+      null);
   }
 
   @Test
@@ -664,11 +667,28 @@ public class TestScannersFromClientSide {
     testAsyncScanner(TableName.valueOf(name.getMethodName()),
       30000,
       1,
-      1);
+      1,
+      -1,
+      null);
+  }
+
+  @Test
+  public void testAsyncScannerWithoutCaching() throws Exception {
+    testAsyncScanner(TableName.valueOf(name.getMethodName()),
+      5,
+      1,
+      1,
+      1,
+      (b) -> {
+        try {
+          TimeUnit.MILLISECONDS.sleep(500);
+        } catch (InterruptedException ex) {
+        }
+      });
   }
 
   private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
-      int qualifierNumber) throws Exception {
+      int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
     assert rowNumber > 0;
     assert familyNumber > 0;
     assert qualifierNumber > 0;
@@ -707,23 +727,33 @@ public class TestScannersFromClientSide {
 
     Scan scan = new Scan();
     scan.setAsyncPrefetch(true);
-    ResultScanner scanner = ht.getScanner(scan);
-    List<Cell> kvListScan = new ArrayList<>();
-    Result result;
-    boolean first = true;
-    while ((result = scanner.next()) != null) {
-      // waiting for cache. see HBASE-17376
-      if (first) {
-        TimeUnit.SECONDS.sleep(1);
-        first = false;
-      }
-      for (Cell kv : result.listCells()) {
-        kvListScan.add(kv);
+    if (caching > 0) {
+      scan.setCaching(caching);
+    }
+    try (ResultScanner scanner = ht.getScanner(scan)) {
+      assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
+      ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
+      List<Cell> kvListScan = new ArrayList<>();
+      Result result;
+      boolean first = true;
+      int actualRows = 0;
+      while ((result = scanner.next()) != null) {
+        ++actualRows;
+        // waiting for cache. see HBASE-17376
+        if (first) {
+          TimeUnit.SECONDS.sleep(1);
+          first = false;
+        }
+        for (Cell kv : result.listCells()) {
+          kvListScan.add(kv);
+        }
       }
+      assertEquals(rowNumber, actualRows);
+      // These cells may have different rows but it is ok. The Result#getRow
+      // isn't used in the verifyResult()
+      result = Result.create(kvListScan);
+      verifyResult(result, kvListExp, toLog, "Testing async scan");
     }
-    result = Result.create(kvListScan);
-    assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
-    verifyResult(result, kvListExp, toLog, "Testing async scan");
     TEST_UTIL.deleteTable(table);
   }
 


[06/14] hbase git commit: HBASE-17779 disable_table_replication returns misleading message and does not turn off replication (Janos Gub)

Posted by sy...@apache.org.
HBASE-17779 disable_table_replication returns misleading message and does not turn off replication (Janos Gub)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/777fea55
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/777fea55
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/777fea55

Branch: refs/heads/hbase-12439
Commit: 777fea552eab3262e95053b2fc757fc49dfad96d
Parents: 44b2558
Author: tedyu <yu...@gmail.com>
Authored: Tue Mar 14 12:13:34 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Mar 14 12:13:34 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hbase/client/HBaseAdmin.java  | 35 +++++++++++++++-----
 .../TestReplicationAdminWithClusters.java       | 17 ++++++++++
 2 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/777fea55/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 1f143b5..6918184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -4228,14 +4228,16 @@ public class HBaseAdmin implements Admin {
   /**
    * Set the table's replication switch if the table's replication switch is already not set.
    * @param tableName name of the table
-   * @param isRepEnabled is replication switch enable or disable
+   * @param enableRep is replication switch enable or disable
    * @throws IOException if a remote or network exception occurs
    */
-  private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
+  private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
     HTableDescriptor htd = getTableDescriptor(tableName);
-    if (isTableRepEnabled(htd) ^ isRepEnabled) {
+    ReplicationState currentReplicationState = getTableReplicationState(htd);
+    if (enableRep && currentReplicationState != ReplicationState.ENABLED
+        || !enableRep && currentReplicationState != ReplicationState.DISABLED) {
       for (HColumnDescriptor hcd : htd.getFamilies()) {
-        hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
+        hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
             : HConstants.REPLICATION_SCOPE_LOCAL);
       }
       modifyTable(tableName, htd);
@@ -4243,17 +4245,34 @@ public class HBaseAdmin implements Admin {
   }
 
   /**
+   * This enum indicates the current state of the replication for a given table.
+   */
+  private enum ReplicationState {
+    ENABLED, // all column families enabled
+    MIXED, // some column families enabled, some disabled
+    DISABLED // all column families disabled
+  }
+
+  /**
    * @param htd table descriptor details for the table to check
-   * @return true if table's replication switch is enabled
+   * @return ReplicationState the current state of the table.
    */
-  private boolean isTableRepEnabled(HTableDescriptor htd) {
+  private ReplicationState getTableReplicationState(HTableDescriptor htd) {
+    boolean hasEnabled = false;
+    boolean hasDisabled = false;
+
     for (HColumnDescriptor hcd : htd.getFamilies()) {
       if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
           && hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
-        return false;
+        hasDisabled = true;
+      } else {
+        hasEnabled = true;
       }
     }
-    return true;
+
+    if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
+    if (hasEnabled) return ReplicationState.ENABLED;
+    return ReplicationState.DISABLED;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/777fea55/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index b44ecbf..312a90a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -76,6 +76,23 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
   }
 
   @Test(timeout = 300000)
+  public void disableNotFullReplication() throws Exception {
+    HTableDescriptor table = admin2.getTableDescriptor(tableName);
+    HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
+    table.addFamily(f);
+    admin1.disableTable(tableName);
+    admin1.modifyTable(tableName, table);
+    admin1.enableTable(tableName);
+
+
+    admin1.disableTableReplication(tableName);
+    table = admin1.getTableDescriptor(tableName);
+    for (HColumnDescriptor fam : table.getColumnFamilies()) {
+      assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
+    }
+  }
+
+  @Test(timeout = 300000)
   public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
     admin1.disableTableReplication(tableName);
     admin2.disableTable(tableName);


[07/14] hbase git commit: HBASE-17780 BoundedByteBufferPool "At capacity" messages are not actionable

Posted by sy...@apache.org.
HBASE-17780 BoundedByteBufferPool "At capacity" messages are not actionable


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14fb57ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14fb57ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14fb57ca

Branch: refs/heads/hbase-12439
Commit: 14fb57cab2fd8c0117d59669018b09e29bd6e387
Parents: 777fea5
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Mar 14 13:23:11 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Mar 14 13:23:11 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/14fb57ca/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
index 079a277..7bce0e5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
@@ -155,8 +155,8 @@ public class BoundedByteBufferPool {
       long prevState = stateRef.get();
       countOfBuffers = toCountOfBuffers(prevState);
       if (countOfBuffers >= maxToCache) {
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("At capacity: " + countOfBuffers);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("At capacity: " + countOfBuffers);
         }
         return;
       }


[12/14] hbase git commit: HBASE-17707 New More Accurate Table Skew cost function/generator - re-enable with test fix

Posted by sy...@apache.org.
HBASE-17707 New More Accurate Table Skew cost function/generator - re-enable with test fix

This reverts commit 9214ad69af486109cc4dd31f60a82ac7ad8d3427.

Signed-off-by: tedyu <yu...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53e9a1c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53e9a1c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53e9a1c4

Branch: refs/heads/hbase-12439
Commit: 53e9a1c43a3861c59d6fc5198982973a1678b65e
Parents: 1849e8a
Author: Kahlil Oppenheimer <ka...@gmail.com>
Authored: Wed Mar 15 11:43:18 2017 -0400
Committer: tedyu <yu...@gmail.com>
Committed: Wed Mar 15 20:42:40 2017 -0700

----------------------------------------------------------------------
 .../hbase/master/balancer/BaseLoadBalancer.java |  74 ++++
 .../master/balancer/StochasticLoadBalancer.java | 441 ++++++++++++++++++-
 .../balancer/TestStochasticLoadBalancer.java    |  35 +-
 .../balancer/TestStochasticLoadBalancer2.java   |   4 +
 4 files changed, 549 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 0f1b1a2..b0e088c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.master.RackManager;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
 import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -140,6 +141,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
     int[]   initialRegionIndexToServerIndex;    //regionIndex -> serverIndex (initial cluster state)
     int[]   regionIndexToTableIndex;     //regionIndex -> tableIndex
     int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
+    int[]   numRegionsPerTable;          // tableIndex -> number of regions that table has
     int[]   numMaxRegionsPerTable;       //tableIndex -> max number of regions in a single RS
     int[]   regionIndexToPrimaryIndex;   //regionIndex -> regionIndex of the primary
     boolean hasRegionReplicas = false;   //whether there is regions with replicas
@@ -330,6 +332,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
       numTables = tables.size();
       numRegionsPerServerPerTable = new int[numServers][numTables];
+      numRegionsPerTable = new int[numTables];
 
       for (int i = 0; i < numServers; i++) {
         for (int j = 0; j < numTables; j++) {
@@ -339,6 +342,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
 
       for (int i=0; i < regionIndexToServerIndex.length; i++) {
         if (regionIndexToServerIndex[i] >= 0) {
+          numRegionsPerTable[regionIndexToTableIndex[i]]++;
           numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
         }
       }
@@ -470,6 +474,76 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
       }
     }
 
+    /**
+     * Returns the minimum number of regions of a table T each server would store if T were
+     * perfectly distributed (i.e. round-robin-ed) across the cluster
+     */
+    public int minRegionsIfEvenlyDistributed(int table) {
+      return numRegionsPerTable[table] / numServers;
+    }
+
+    /**
+     * Returns the maximum number of regions of a table T each server would store if T were
+     * perfectly distributed (i.e. round-robin-ed) across the cluster
+     */
+    public int maxRegionsIfEvenlyDistributed(int table) {
+      int min = minRegionsIfEvenlyDistributed(table);
+      return numRegionsPerTable[table] % numServers == 0 ? min : min + 1;
+    }
+
+    /**
+     * Returns the number of servers that should hold maxRegionsIfEvenlyDistributed for a given
+     * table. A special case here is if maxRegionsIfEvenlyDistributed == minRegionsIfEvenlyDistributed,
+     * in which case all servers should hold the max
+     */
+    public int numServersWithMaxRegionsIfEvenlyDistributed(int table) {
+      int numWithMax = numRegionsPerTable[table] % numServers;
+      if (numWithMax == 0) {
+        return numServers;
+      } else {
+        return numWithMax;
+      }
+    }
+
+    /**
+     * Returns true iff at least one server in the cluster stores either more than the min/max load
+     * per server when all regions are evenly distributed across the cluster
+     */
+    public boolean hasUnevenRegionDistribution() {
+      int minLoad = numRegions / numServers;
+      int maxLoad = numRegions % numServers == 0 ? minLoad : minLoad + 1;
+      for (int server = 0; server < numServers; server++) {
+        int numRegions = getNumRegions(server);
+        if (numRegions > maxLoad || numRegions < minLoad) {
+          return true;
+        }
+      }
+      return false;
+    }
+
+    /**
+     * Returns a pair where the first server is that with the least number of regions across the
+     * cluster and the second server is that with the most number of regions across the cluster
+     */
+    public Pair<Integer, Integer> findLeastAndMostLoadedServers() {
+      int minServer = 0;
+      int maxServer = 0;
+      int minLoad = getNumRegions(minServer);
+      int maxLoad = minLoad;
+      for (int server = 1; server < numServers; server++) {
+        int numRegions = getNumRegions(server);
+        if (numRegions < minLoad) {
+          minServer = server;
+          minLoad = numRegions;
+        }
+        if (numRegions > maxLoad) {
+          maxServer = server;
+          maxLoad = numRegions;
+        }
+      }
+      return Pair.newPair(minServer, maxServer);
+    }
+
     /** An action to move or swap a region */
     public static class Action {
       public static enum Type {

http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 5c92973..8cbdd1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -18,10 +18,14 @@
 package org.apache.hadoop.hbase.master.balancer;
 
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -30,7 +34,6 @@ import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.master.MasterServices;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
@@ -49,6 +53,10 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegi
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
 
 /**
  * <p>This is a best effort load balancer. Given a Cost function F(C) =&gt; x It will
@@ -920,6 +928,225 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   }
 
   /**
+   * Generates candidate actions to minimize the TableSkew cost function.
+   *
+   * For efficiency reasons, the cluster must be passed in when this generator is
+   * constructed. Every move generated is applied to the cost function
+   * (i.e. it is assumed that every action we generate is applied to the cluster).
+   * This means we can adjust our cost incrementally for the cluster, rather than
+   * recomputing at each iteration.
+   */
+  static class TableSkewCandidateGenerator extends CandidateGenerator {
+
+    // Mapping of table -> true iff too many servers in the cluster store at least
+    // cluster.maxRegionsIfEvenlydistributed(table)
+    boolean[] tablesWithEnoughServersWithMaxRegions = null;
+
+    @Override
+    Action generate(Cluster cluster) {
+      if (tablesWithEnoughServersWithMaxRegions == null || tablesWithEnoughServersWithMaxRegions.length != cluster.numTables) {
+        tablesWithEnoughServersWithMaxRegions = new boolean[cluster.numTables];
+      }
+      if (cluster.hasUnevenRegionDistribution()) {
+        Pair<Integer, Integer> leastAndMostLoadedServers = cluster.findLeastAndMostLoadedServers();
+        return moveFromTableWithEnoughRegions(cluster, leastAndMostLoadedServers.getSecond(), leastAndMostLoadedServers.getFirst());
+      } else {
+        Optional<TableAndServer> tableServer = findSkewedTableServer(cluster);
+        if (!tableServer.isPresent()) {
+          return Cluster.NullAction;
+        }
+        return findBestActionForTableServer(cluster, tableServer.get());
+      }
+    }
+
+    /**
+     * Returns a move fromServer -> toServer such that after the move fromServer will still have at least
+     * the min # regions in terms of table skew calculation
+     */
+    private Action moveFromTableWithEnoughRegions(Cluster cluster, int fromServer, int toServer) {
+      for (int table : getShuffledRangeOfInts(0, cluster.numTables)) {
+        int min = cluster.minRegionsIfEvenlyDistributed(table);
+        if (cluster.numRegionsPerServerPerTable[fromServer][table] > min) {
+          return getAction(fromServer, pickRandomRegionFromTableOnServer(cluster, fromServer, table), toServer, -1);
+        }
+      }
+      return Cluster.NullAction;
+    }
+
+    /**
+     * Picks a random subset of tables, then for each table T checks across cluster and returns first
+     * server (if any) which holds too many regions from T. Returns Optional.absent() if no servers
+     * are found that hold too many regions.
+     */
+    private Optional<TableAndServer> findSkewedTableServer(Cluster cluster) {
+      Optional<TableAndServer> tableServer = Optional.absent();
+      List<Integer> servers = getShuffledRangeOfInts(0, cluster.numServers);
+      Iterator<Integer> tableIter = getShuffledRangeOfInts(0, cluster.numTables).iterator();
+      while (tableIter.hasNext() && !tableServer.isPresent()) {
+        int table = tableIter.next();
+        int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+        int numShouldHaveMaxRegions = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+        int numWithMaxRegions = 0;
+        for (int server : servers) {
+          int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+          // if more than max, server clearly has too many regions
+          if (numRegions > maxRegions) {
+            tableServer = Optional.of(new TableAndServer(table, server));
+            break;
+          }
+          // if equal to max, check to see if we are within acceptable limit
+          if (numRegions == maxRegions) {
+            numWithMaxRegions++;
+          }
+        }
+
+        tablesWithEnoughServersWithMaxRegions[table] = numWithMaxRegions >= numShouldHaveMaxRegions;
+        // If we have found a table with more than max, we are done
+        if (tableServer.isPresent()) {
+          break;
+        }
+
+        // Otherwise, check to see if there are too many servers with maxRegions
+        if (numWithMaxRegions > numShouldHaveMaxRegions) {
+          for (int server : servers) {
+            int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+            if (numRegions == maxRegions) {
+              tableServer = Optional.of(new TableAndServer(table, server));
+              break;
+            }
+          }
+        }
+      }
+
+      return tableServer;
+    }
+
+    /**
+     * Returns an list of integers that stores [upper - lower] unique integers in random order
+     * s.t. for each integer i lower <= i < upper
+     */
+    private List<Integer> getShuffledRangeOfInts(int lower, int upper) {
+      Preconditions.checkArgument(lower < upper);
+      ArrayList<Integer> arr = new ArrayList<Integer>(upper - lower);
+      for (int i = lower; i < upper; i++) {
+        arr.add(i);
+      }
+      Collections.shuffle(arr);
+      return arr;
+    }
+
+    /**
+     * Pick a random region from the specified server and table. Returns -1 if no regions from
+     * the given table lie on the given server
+     */
+    protected int pickRandomRegionFromTableOnServer(Cluster cluster, int server, int table) {
+      if (server < 0 || table < 0) {
+        return -1;
+      }
+      List<Integer> regionsFromTable = new ArrayList<>();
+      for (int region : cluster.regionsPerServer[server]) {
+        if (cluster.regionIndexToTableIndex[region] == table) {
+          regionsFromTable.add(region);
+        }
+      }
+      return regionsFromTable.get(RANDOM.nextInt(regionsFromTable.size()));
+    }
+
+    /**
+     * Returns servers in the cluster that store fewer than k regions for the given table (sorted by
+     * servers with the fewest regions from givenTable first)
+     */
+    public List<Integer> getServersWithFewerThanKRegionsFromTable(final Cluster cluster, final int givenTable, int k) {
+      List<Integer> serversWithFewerThanK = new ArrayList<>();
+      for (int server = 0; server < cluster.numServers; server++) {
+        if (cluster.numRegionsPerServerPerTable[server][givenTable] < k) {
+          serversWithFewerThanK.add(server);
+        }
+      }
+      Collections.sort(serversWithFewerThanK, new Comparator<Integer>() {
+        @Override
+        public int compare(Integer o1, Integer o2) {
+          return cluster.numRegionsPerServerPerTable[o1.intValue()][givenTable] - cluster.numRegionsPerServerPerTable[o2.intValue()][givenTable];
+        }
+      });
+      return serversWithFewerThanK;
+    }
+
+    /**
+     * Given a table T for which server S stores too many regions, attempts to find a
+     * SWAP operation that will better balance the cluster
+     */
+    public Action findBestActionForTableServer(Cluster cluster, TableAndServer tableServer) {
+      int fromTable = tableServer.getTable();
+      int fromServer = tableServer.getServer();
+
+      int minNumRegions = cluster.minRegionsIfEvenlyDistributed(fromTable);
+      int maxNumRegions = cluster.maxRegionsIfEvenlyDistributed(fromTable);
+      List<Integer> servers;
+      if (tablesWithEnoughServersWithMaxRegions[fromTable]) {
+        servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, minNumRegions);
+      } else {
+        servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, maxNumRegions);
+      }
+
+      if (servers.isEmpty()) {
+        return Cluster.NullAction;
+      }
+
+      Optional<Action> swap = trySwap(cluster, fromServer, fromTable, servers);
+      if (swap.isPresent()) {
+        return swap.get();
+      }
+
+      // If we cannot perform a swap, we should do nothing
+      return Cluster.NullAction;
+    }
+
+    /**
+     * Given server1, table1, we try to find server2 and table2 such that
+     * at least 3 of the following 4 criteria are met
+     *
+     * 1) server1 has too many regions of table1
+     * 2) server1 has too few regions of table2
+     * 3) server2 has too many regions of table2
+     * 4) server2 has too few regions of table1
+     *
+     * We consider N regions from table T
+     *    too few if: N < cluster.minRegionsIfEvenlyDistributed(T)
+     *    too many if: N > cluster.maxRegionsIfEvenlyDistributed(T)
+     *
+     * Because (1) and (4) are true apriori, we only need to check for (2) and (3).
+     *
+     * If 3 of the 4 criteria are met, we return a swap operation between
+     * randomly selected regions from table1 on server1 and from table2 on server2.
+     *
+     * Optional.absent() is returned if we could not find such a SWAP.
+     */
+    private Optional<Action> trySwap(Cluster cluster, int server1, int table1, List<Integer> candidateServers) {
+      // Because conditions (1) and (4) are true apriori, we only need to meet one of conditions (2) or (3)
+      List<Integer> tables = getShuffledRangeOfInts(0, cluster.numTables);
+      for (int table2 : tables) {
+        int minRegions = cluster.minRegionsIfEvenlyDistributed(table2);
+        int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table2);
+        for (int server2 : candidateServers) {
+          int numRegions1 = cluster.numRegionsPerServerPerTable[server1][table2];
+          int numRegions2 = cluster.numRegionsPerServerPerTable[server2][table2];
+          if (numRegions2 == 0) {
+            continue;
+          }
+          if ((numRegions1 < minRegions || numRegions2 > maxRegions) ||
+              (minRegions != maxRegions && numRegions1 == minRegions && numRegions2 == maxRegions)) {
+            int region1 = pickRandomRegionFromTableOnServer(cluster, server1, table1);
+            int region2 = pickRandomRegionFromTableOnServer(cluster, server2, table2);
+            return Optional.of(getAction(server1, region1, server2, region2));
+          }
+        }
+      }
+      return Optional.absent();
+    }
+  }
+
+  /**
    * Base class of StochasticLoadBalancer's Cost Functions.
    */
   abstract static class CostFunction {
@@ -966,8 +1193,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         break;
       case SWAP_REGIONS:
         SwapRegionsAction a = (SwapRegionsAction) action;
-        regionMoved(a.fromRegion, a.fromServer, a.toServer);
-        regionMoved(a.toRegion, a.toServer, a.fromServer);
+        regionSwapped(a.fromRegion, a.fromServer, a.toRegion, a.toServer);
         break;
       default:
         throw new RuntimeException("Uknown action:" + action.type);
@@ -977,6 +1203,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
     protected void regionMoved(int region, int oldServer, int newServer) {
     }
 
+    protected void regionSwapped(int region1, int server1, int region2, int server2) {
+      regionMoved(region1, server1, server2);
+      regionMoved(region2, server2, server1);
+    }
+
     abstract double cost();
 
     /**
@@ -1170,9 +1401,188 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
         "hbase.master.balancer.stochastic.tableSkewCost";
     private static final float DEFAULT_TABLE_SKEW_COST = 35;
 
+    /**
+     * Ranges from 0.0 to 1.0 and is the proportion of how much the most skewed table
+     * (as opposed to the average skew across all tables) should affect TableSkew cost
+     */
+    private static final String MAX_TABLE_SKEW_WEIGHT_KEY =
+        "hbase.master.balancer.stochastic.maxTableSkewWeight";
+    private float DEFAULT_MAX_TABLE_SKEW_WEIGHT = 0.0f;
+
+    private final float maxTableSkewWeight;
+    private final float avgTableSkewWeight;
+
+    // Number of moves for each table required to bring the cluster to a perfectly balanced
+    // state (i.e. as if you had round-robin-ed regions across cluster)
+    private int[] numMovesPerTable;
+
     TableSkewCostFunction(Configuration conf) {
       super(conf);
       this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
+      maxTableSkewWeight = conf.getFloat(MAX_TABLE_SKEW_WEIGHT_KEY, DEFAULT_MAX_TABLE_SKEW_WEIGHT);
+      Preconditions.checkArgument(0.0 <= maxTableSkewWeight && maxTableSkewWeight <= 1.0);
+      avgTableSkewWeight = 1 - maxTableSkewWeight;
+    }
+
+    /**
+     * Computes cost by:
+     *
+     * 1) Computing a skew score for each table (based on the number of regions
+     * from that table that would have to be moved to reach an evenly balanced state)
+     *
+     * 2) Taking a weighted average of the highest skew score with the average skew score
+     *
+     * 3) Square rooting that value to more evenly distribute the values between 0-1
+     * (since we have observed they are generally very small).
+     *
+     * @return the table skew cost for the cluster
+     */
+    @Override
+    double cost() {
+      double[] skewPerTable = computeSkewPerTable();
+      if (skewPerTable.length == 0) {
+        return 0;
+      }
+      double maxTableSkew = max(skewPerTable);
+      double avgTableSkew = average(skewPerTable);
+
+      return Math.sqrt(maxTableSkewWeight * maxTableSkew + avgTableSkewWeight * avgTableSkew);
+    }
+
+    @Override
+    void init(Cluster cluster) {
+      super.init(cluster);
+      numMovesPerTable = computeNumMovesPerTable();
+    }
+
+    /**
+     * Adjusts computed number of moves after two regions have been swapped
+     */
+    @Override
+    protected void regionSwapped(int region1, int server1, int region2, int server2) {
+      // If different tables, simply perform two moves
+      if (cluster.regionIndexToTableIndex[region1] != cluster.regionIndexToTableIndex[region2]) {
+        super.regionSwapped(region1, server1, region2, server2);
+        return;
+      }
+      // If same table, do nothing
+    }
+
+    /**
+     * Adjusts computed number of moves per table after a region has been moved
+     */
+    @Override
+    protected void regionMoved(int region, int oldServer, int newServer) {
+      int table = cluster.regionIndexToTableIndex[region];
+      numMovesPerTable[table] = computeNumMovesForTable(table);
+    }
+
+    /**
+     * Returns a mapping of table -> numMoves, where numMoves is the number of regions required to bring
+     * each table to a fully balanced state (i.e. as if its regions had been round-robin-ed across the cluster).
+     */
+    private int[] computeNumMovesPerTable() {
+      // Determine # region moves required for each table to have regions perfectly distributed across cluster
+      int[] numMovesPerTable = new int[cluster.numTables];
+      for (int table = 0; table < cluster.numTables; table++) {
+        numMovesPerTable[table] = computeNumMovesForTable(table);
+      }
+      return numMovesPerTable;
+    }
+
+    /**
+     * Computes the number of moves required across all servers to bring the given table to a balanced state
+     * (i.e. as if its regions had been round-robin-ed across the cluster). We only consider moves as # of regions
+     * that need to be sent, not received, so that we do not double count region moves.
+     */
+    private int computeNumMovesForTable(int table) {
+      int numMinRegions = cluster.minRegionsIfEvenlyDistributed(table);
+      int numMaxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+      int numMaxServersRemaining = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+      int numMoves = 0;
+
+      for (int server = 0; server < cluster.numServers; server++) {
+        int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+        if (numRegions >= numMaxRegions && numMaxServersRemaining > 0) {
+          numMoves += numRegions - numMaxRegions;
+          numMaxServersRemaining--;
+        } else if (numRegions > numMinRegions) {
+          numMoves += numRegions - numMinRegions;
+        }
+      }
+      return numMoves;
+    }
+
+    /**
+     * Returns mapping of tableIndex -> tableSkewScore, where tableSkewScore is a double between 0 to 1 with
+     * 0 indicating no table skew (i.e. perfect distribution of regions among servers), and 1 representing
+     * pathological table skew (i.e. all of a servers regions belonging to one table).
+     */
+    private double[] computeSkewPerTable() {
+      if (numMovesPerTable == null) {
+        numMovesPerTable = computeNumMovesPerTable();
+      }
+      double[] scaledSkewPerTable = new double[numMovesPerTable.length];
+      for (int table = 0; table < numMovesPerTable.length; table++) {
+        int numTotalRegions = cluster.numRegionsPerTable[table];
+        int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+        int pathologicalNumMoves = numTotalRegions - maxRegions;
+        scaledSkewPerTable[table] = pathologicalNumMoves == 0 ? 0 : (double) numMovesPerTable[table] / pathologicalNumMoves;
+      }
+      return scaledSkewPerTable;
+    }
+
+    /**
+     * Returns the max of the values in the passed array
+     */
+    private double max(double[] arr) {
+      double max = arr[0];
+      for (double d : arr) {
+        if (d > max) {
+          max = d;
+        }
+      }
+      return max;
+    }
+
+    /**
+     * Returns the average of the values in the passed array
+     */
+    private double average(double[] arr) {
+      double sum = 0;
+      for (double d : arr) {
+        sum += d;
+      }
+      return sum / arr.length;
+    }
+  }
+
+  /**
+   * Compute the cost of a potential cluster configuration based upon how evenly
+   * distributed tables are.
+   *
+   * @deprecated replaced by TableSkewCostFunction
+   * This function only considers the maximum # of regions of each table stored
+   * on any one server. This, however, neglects a number of cases. Consider the case
+   * where N servers store 1 more region than as if the regions had been round robin-ed
+   * across the cluster, but then K servers stored 0 regions of the table. The maximum
+   * # regions stored would not properly reflect the table-skew of the cluster.
+   *
+   * Furthermore, this relies upon the cluster.numMaxRegionsPerTable field, which is not
+   * properly updated. The values per table only increase as the cluster shifts (i.e.
+   * as new maxima are found), but they do not go down when the maximum skew decreases
+   * for a particular table.
+   */
+  @Deprecated
+  static class OldTableSkewCostFunction extends CostFunction {
+
+    private static final String TABLE_SKEW_COST_KEY =
+        "hbase.master.balancer.stochastic.tableSkewCost";
+    private static final float DEFAULT_TABLE_SKEW_COST = 35;
+
+    OldTableSkewCostFunction(Configuration conf) {
+      super(conf);
+      this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
     }
 
     @Override
@@ -1450,7 +1860,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
       for (int i = 0 ; i < costsPerGroup.length; i++) {
         totalCost += costsPerGroup[i];
       }
-      return scale(0, maxCost, totalCost);
+      // Still return high cost for single region replicas being cohosted even as cluster scales
+      return Math.sqrt(scale(0, maxCost, totalCost));
     }
 
     /**
@@ -1589,9 +2000,31 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
   }
 
   /**
+   * Data structure that holds table and server indexes
+   */
+  static class TableAndServer {
+    private final int table;
+    private final int server;
+
+    public TableAndServer(int table, int server) {
+      this.table = table;
+      this.server = server;
+    }
+
+    public int getTable() {
+      return table;
+    }
+
+    public int getServer() {
+      return server;
+    }
+  }
+
+  /**
    * A helper function to compose the attribute name from tablename and costfunction name
    */
   public static String composeAttributeName(String tableName, String costFunctionName) {
     return tableName + TABLE_FUNCTION_SEP + costFunctionName;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index 9d193d2..37ff35f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices;
 import org.apache.hadoop.hbase.master.RackManager;
 import org.apache.hadoop.hbase.master.RegionPlan;
 import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.TableSkewCandidateGenerator;
 import org.apache.hadoop.hbase.testclassification.FlakeyTests;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -118,7 +120,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
    */
   @Test
   public void testBalanceCluster() throws Exception {
-
+    float oldMinCostNeedBalance = conf.getFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.05f);
+    conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.02f);
+    loadBalancer.setConf(conf);
     for (int[] mockCluster : clusterStateMocks) {
       Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
       List<ServerAndLoad> list = convertToList(servers);
@@ -134,6 +138,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
         returnServer(entry.getKey());
       }
     }
+    // reset config
+    conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, oldMinCostNeedBalance);
+    loadBalancer.setConf(conf);
   }
 
   @Test
@@ -252,6 +259,32 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
     double result = storeFileCostFunction.getRegionLoadCost(regionLoads);
     // storefile size cost is simply an average of it's value over time
     assertEquals(2.5, result, 0.01);
+ }
+
+  @Test (timeout=60000)
+  public void testTableSkewCandidateGeneratorConvergesToZero() {
+    int replication = 1;
+    StochasticLoadBalancer.CostFunction
+        costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
+    CandidateGenerator generator = new TableSkewCandidateGenerator();
+    for (int i = 0; i < 100; i++) {
+      int numNodes = 1 + rand.nextInt(5 * i + 1);
+      int numTables = 1 + rand.nextInt(5 * i + 1);
+      int numRegions = rand.nextInt(numTables * 99) + Math.max(numTables, numNodes); // num regions between max(numTables, numNodes) - numTables*100
+      int numRegionsPerServer = rand.nextInt(numRegions / numNodes) + 1; // num regions per server (except one) between 1 and numRegions / numNodes
+
+      Map<ServerName, List<HRegionInfo>> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
+      BaseLoadBalancer.Cluster cluster = new Cluster(serverMap, null, null, null);
+      costFunction.init(cluster);
+      double cost = costFunction.cost();
+      while (cost > 0) {
+        Cluster.Action action = generator.generate(cluster);
+        cluster.doAction(action);
+        costFunction.postAction(action);
+        cost = costFunction.cost();
+      }
+      assertEquals(0, cost, .000000000001);
+    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
index 2f315de..03d2ef2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
@@ -35,6 +35,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
     conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
     conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
     conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+
     conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
     conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
     loadBalancer.setConf(conf);
@@ -70,6 +71,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
   public void testRegionReplicasOnMidClusterHighReplication() {
     conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
     conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
+    conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 4);
     loadBalancer.setConf(conf);
     int numNodes = 80;
     int numRegions = 6 * numNodes;
@@ -77,6 +79,8 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
     int numRegionsPerServer = 5;
     int numTables = 10;
     testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
+    // reset config
+    conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 35);
   }
 
   @Test (timeout = 800000)