You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2017/03/16 17:12:29 UTC
[01/14] hbase git commit: HBASE-17746
TestSimpleRpcScheduler.testCoDelScheduling is broken
Repository: hbase
Updated Branches:
refs/heads/hbase-12439 4b541d638 -> e67eb6c42
HBASE-17746 TestSimpleRpcScheduler.testCoDelScheduling is broken
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/31bc94ae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/31bc94ae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/31bc94ae
Branch: refs/heads/hbase-12439
Commit: 31bc94ae6094d02a73d347549e23bcbaff97838f
Parents: 4b541d6
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Fri Mar 10 18:42:27 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Mar 13 10:13:42 2017 +0800
----------------------------------------------------------------------
.../main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java | 2 +-
.../java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java | 3 +--
2 files changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/31bc94ae/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index 616f741..d51d83b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -85,7 +85,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
callExecutor = new RWQueueRpcExecutor("deafult.RWQ", Math.max(2, handlerCount),
maxQueueLength, priority, conf, server);
} else {
- if (RpcExecutor.isFifoQueueType(callQueueType)) {
+ if (RpcExecutor.isFifoQueueType(callQueueType) || RpcExecutor.isCodelQueueType(callQueueType)) {
callExecutor = new FastPathBalancedQueueRpcExecutor("deafult.FPBQ", handlerCount,
maxQueueLength, priority, conf, server);
} else {
http://git-wip-us.apache.org/repos/asf/hbase/blob/31bc94ae/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index 04ac519..5e4520d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -434,8 +434,7 @@ public class TestSimpleRpcScheduler {/*
@Test
public void testCoDelScheduling() throws Exception {
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
- envEdge.threadNamePrefixs.add("RpcServer.CodelFPBQ.default.handler");
- envEdge.threadNamePrefixs.add("RpcServer.CodelRWQ.default.handler");
+ envEdge.threadNamePrefixs.add("RpcServer.deafult.FPBQ.Codel.handler");
Configuration schedConf = HBaseConfiguration.create();
schedConf.setInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, 250);
schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
[02/14] hbase git commit: HBASE-17773 VerifyReplication tool wrongly
emits Invalid arguments error
Posted by sy...@apache.org.
HBASE-17773 VerifyReplication tool wrongly emits Invalid arguments error
Signed-off-by: Guanghao Zhang <zg...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fee67bcf
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fee67bcf
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fee67bcf
Branch: refs/heads/hbase-12439
Commit: fee67bcf1432cd16720fb97a0135bd67b0d2b064
Parents: 31bc94a
Author: Tom Tsuruhara <to...@linecorp.com>
Authored: Sat Mar 11 18:20:06 2017 +0900
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Mon Mar 13 17:46:58 2017 +0800
----------------------------------------------------------------------
.../hbase/mapreduce/replication/VerifyReplication.java | 9 +++++----
1 file changed, 5 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fee67bcf/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
index 145fede..ba5966b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
@@ -463,10 +463,6 @@ public class VerifyReplication extends Configured implements Tool {
continue;
}
- if (cmd.startsWith("--")) {
- printUsage("Invalid argument '" + cmd + "'");
- }
-
final String delimiterArgKey = "--delimiter=";
if (cmd.startsWith(delimiterArgKey)) {
delimiter = cmd.substring(delimiterArgKey.length());
@@ -483,6 +479,11 @@ public class VerifyReplication extends Configured implements Tool {
verbose = true;
continue;
}
+
+ if (cmd.startsWith("--")) {
+ printUsage("Invalid argument '" + cmd + "'");
+ }
+
if (i == args.length-2) {
peerId = cmd;
}
[09/14] hbase git commit: HBASE-17584 Expose ScanMetrics with
ResultScanner rather than Scan
Posted by sy...@apache.org.
HBASE-17584 Expose ScanMetrics with ResultScanner rather than Scan
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/a49bc58a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/a49bc58a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/a49bc58a
Branch: refs/heads/hbase-12439
Commit: a49bc58a5456c2974552c7b6ffab9ea39393ca78
Parents: aace02a
Author: zhangduo <zh...@apache.org>
Authored: Fri Feb 24 14:08:10 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Wed Mar 15 17:48:58 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AbstractClientScanner.java | 8 +++-----
.../hbase/client/AsyncTableResultScanner.java | 6 ++++++
.../apache/hadoop/hbase/client/ClientScanner.java | 11 ++++++-----
.../apache/hadoop/hbase/client/ResultScanner.java | 6 ++++++
.../java/org/apache/hadoop/hbase/client/Scan.java | 6 +++++-
.../client/metrics/ServerSideScanMetrics.java | 13 +++++++++++--
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 17 ++++++-----------
.../hadoop/hbase/rest/client/RemoteHTable.java | 6 ++++++
.../hbase/client/ClientSideRegionScanner.java | 1 -
.../hbase/mapreduce/TableRecordReaderImpl.java | 4 ++--
.../TestServerSideScanMetricsFromClientSide.java | 14 +++++++-------
.../hadoop/hbase/regionserver/RegionAsTable.java | 6 ++++++
12 files changed, 64 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
index 87304c3..ffb2fa1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractClientScanner.java
@@ -38,13 +38,11 @@ public abstract class AbstractClientScanner implements ResultScanner {
}
/**
- * Used internally accumulating metrics on scan. To
- * enable collection of metrics on a Scanner, call {@link Scan#setScanMetricsEnabled(boolean)}.
- * These metrics are cleared at key transition points. Metrics are accumulated in the
- * {@link Scan} object itself.
- * @see Scan#getScanMetrics()
+ * Used internally accumulating metrics on scan. To enable collection of metrics on a Scanner,
+ * call {@link Scan#setScanMetricsEnabled(boolean)}.
* @return Returns the running {@link ScanMetrics} instance or null if scan metrics not enabled.
*/
+ @Override
public ScanMetrics getScanMetrics() {
return scanMetrics;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
index 38d4b2c..eef797c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableResultScanner.java
@@ -30,6 +30,7 @@ import java.util.Queue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* The {@link ResultScanner} implementation for {@link AsyncTable}. It will fetch data automatically
@@ -164,4 +165,9 @@ class AsyncTableResultScanner implements ResultScanner, RawScanResultConsumer {
synchronized boolean isSuspended() {
return resumer != null;
}
+
+ @Override
+ public ScanMetrics getScanMetrics() {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 53e6dd8..bd3d4ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.exceptions.ScannerResetException;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.util.Bytes;
/**
@@ -301,15 +300,17 @@ public abstract class ClientScanner extends AbstractClientScanner {
* for scan/map reduce scenarios, we will have multiple scans running at the same time. By
* default, scan metrics are disabled; if the application wants to collect them, this behavior can
* be turned on by calling calling {@link Scan#setScanMetricsEnabled(boolean)}
- * <p>
- * This invocation clears the scan metrics. Metrics are aggregated in the Scan instance.
*/
protected void writeScanMetrics() {
if (this.scanMetrics == null || scanMetricsPublished) {
return;
}
- MapReduceProtos.ScanMetrics pScanMetrics = ProtobufUtil.toScanMetrics(scanMetrics);
- scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA, pScanMetrics.toByteArray());
+ // Publish ScanMetrics to the Scan Object.
+ // As we have claimed in the comment of Scan.getScanMetrics, this relies on that user will not
+ // call ResultScanner.getScanMetrics and reset the ScanMetrics. Otherwise the metrics published
+ // to Scan will be messed up.
+ scan.setAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA,
+ ProtobufUtil.toScanMetrics(scanMetrics, false).toByteArray());
scanMetricsPublished = true;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
index e9cb476..8951e84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultScanner.java
@@ -27,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
/**
* Interface for client-side scanning. Go to {@link Table} to obtain instances.
@@ -116,4 +117,9 @@ public interface ResultScanner extends Closeable, Iterable<Result> {
* @return true if the lease was successfully renewed, false otherwise.
*/
boolean renewLease();
+
+ /**
+ * @return the scan metrics, or {@code null} if we do not enable metrics.
+ */
+ ScanMetrics getScanMetrics();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index a7d81af..03c692c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -1081,9 +1081,13 @@ public class Scan extends Query {
/**
* @return Metrics on this Scan, if metrics were enabled.
* @see #setScanMetricsEnabled(boolean)
+ * @deprecated Use {@link ResultScanner#getScanMetrics()} instead. And notice that, please do not
+ * use this method and {@link ResultScanner#getScanMetrics()} together, the metrics
+ * will be messed up.
*/
+ @Deprecated
public ScanMetrics getScanMetrics() {
- byte [] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
+ byte[] bytes = getAttribute(Scan.SCAN_ATTRIBUTES_METRICS_DATA);
if (bytes == null) return null;
return ProtobufUtil.toScanMetrics(bytes);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
index 7171a94..b14938b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/metrics/ServerSideScanMetrics.java
@@ -106,11 +106,20 @@ public class ServerSideScanMetrics {
* @return A Map of String -> Long for metrics
*/
public Map<String, Long> getMetricsMap() {
+ return getMetricsMap(true);
+ }
+
+ /**
+ * Get all of the values. If reset is true, we will reset the all AtomicLongs back to 0.
+ * @param reset whether to reset the AtomicLongs to 0.
+ * @return A Map of String -> Long for metrics
+ */
+ public Map<String, Long> getMetricsMap(boolean reset) {
// Create a builder
ImmutableMap.Builder<String, Long> builder = ImmutableMap.builder();
- // For every entry add the value and reset the AtomicLong back to zero
for (Map.Entry<String, AtomicLong> e : this.counters.entrySet()) {
- builder.put(e.getKey(), e.getValue().getAndSet(0));
+ long value = reset ? e.getValue().getAndSet(0) : e.getValue().get();
+ builder.put(e.getKey(), value);
}
// Build the immutable map so that people can't mess around with it.
return builder.build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 10827c3..f44979c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -83,7 +83,6 @@ import org.apache.hadoop.hbase.io.LimitInputStream;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.protobuf.ProtobufMagic;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.quotas.QuotaScope;
import org.apache.hadoop.hbase.quotas.QuotaType;
import org.apache.hadoop.hbase.quotas.ThrottleType;
@@ -95,7 +94,6 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.ByteString;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.CodedInputStream;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Message;
-import org.apache.hadoop.hbase.shaded.com.google.protobuf.Parser;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcChannel;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
import org.apache.hadoop.hbase.shaded.com.google.protobuf.Service;
@@ -164,6 +162,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescript
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.RegionEventDescriptor.EventType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.StoreDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.DynamicClassLoader;
@@ -2043,12 +2042,11 @@ public final class ProtobufUtil {
}
public static ScanMetrics toScanMetrics(final byte[] bytes) {
- Parser<MapReduceProtos.ScanMetrics> parser = MapReduceProtos.ScanMetrics.PARSER;
MapReduceProtos.ScanMetrics pScanMetrics = null;
try {
- pScanMetrics = parser.parseFrom(bytes);
+ pScanMetrics = MapReduceProtos.ScanMetrics.parseFrom(bytes);
} catch (InvalidProtocolBufferException e) {
- //Ignored there are just no key values to add.
+ // Ignored there are just no key values to add.
}
ScanMetrics scanMetrics = new ScanMetrics();
if (pScanMetrics != null) {
@@ -2061,15 +2059,12 @@ public final class ProtobufUtil {
return scanMetrics;
}
- public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics) {
+ public static MapReduceProtos.ScanMetrics toScanMetrics(ScanMetrics scanMetrics, boolean reset) {
MapReduceProtos.ScanMetrics.Builder builder = MapReduceProtos.ScanMetrics.newBuilder();
- Map<String, Long> metrics = scanMetrics.getMetricsMap();
+ Map<String, Long> metrics = scanMetrics.getMetricsMap(reset);
for (Entry<String, Long> e : metrics.entrySet()) {
HBaseProtos.NameInt64Pair nameInt64Pair =
- HBaseProtos.NameInt64Pair.newBuilder()
- .setName(e.getKey())
- .setValue(e.getValue())
- .build();
+ HBaseProtos.NameInt64Pair.newBuilder().setName(e.getKey()).setValue(e.getValue()).build();
builder.addMetrics(nameInt64Pair);
}
return builder.build();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index e762c31..9cc3198 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -57,6 +57,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.TimeRange;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -641,6 +642,11 @@ public class RemoteHTable implements Table {
public boolean renewLease() {
throw new RuntimeException("renewLease() not supported");
}
+
+ @Override
+ public ScanMetrics getScanMetrics() {
+ throw new RuntimeException("getScanMetrics() not supported");
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
index 8ff118e..7ae0537 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClientSideRegionScanner.java
@@ -52,7 +52,6 @@ public class ClientSideRegionScanner extends AbstractClientScanner {
public ClientSideRegionScanner(Configuration conf, FileSystem fs,
Path rootDir, HTableDescriptor htd, HRegionInfo hri, Scan scan, ScanMetrics scanMetrics)
throws IOException {
-
// region is immutable, set isolation level
scan.setIsolationLevel(IsolationLevel.READ_UNCOMMITTED);
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
index 6f1d140..a8ed5f1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TableRecordReaderImpl.java
@@ -81,7 +81,7 @@ public class TableRecordReaderImpl {
*/
public void restart(byte[] firstRow) throws IOException {
currentScan = new Scan(scan);
- currentScan.setStartRow(firstRow);
+ currentScan.withStartRow(firstRow);
currentScan.setScanMetricsEnabled(true);
if (this.scanner != null) {
if (logScannerActivity) {
@@ -273,7 +273,7 @@ public class TableRecordReaderImpl {
* @throws IOException
*/
private void updateCounters() throws IOException {
- ScanMetrics scanMetrics = currentScan.getScanMetrics();
+ ScanMetrics scanMetrics = scanner.getScanMetrics();
if (scanMetrics == null) {
return;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
index ad63cc8..370d3d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java
@@ -192,15 +192,15 @@ public class TestServerSideScanMetricsFromClientSide {
for (int i = 0; i < ROWS.length - 1; i++) {
scan = new Scan(baseScan);
- scan.setStartRow(ROWS[0]);
- scan.setStopRow(ROWS[i + 1]);
+ scan.withStartRow(ROWS[0]);
+ scan.withStopRow(ROWS[i + 1]);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, i + 1);
}
for (int i = ROWS.length - 1; i > 0; i--) {
scan = new Scan(baseScan);
- scan.setStartRow(ROWS[i - 1]);
- scan.setStopRow(ROWS[ROWS.length - 1]);
+ scan.withStartRow(ROWS[i - 1]);
+ scan.withStopRow(ROWS[ROWS.length - 1]);
testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_SCANNED_KEY_METRIC_NAME, ROWS.length - i);
}
@@ -318,12 +318,12 @@ public class TestServerSideScanMetricsFromClientSide {
public void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception {
assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled());
ResultScanner scanner = TABLE.getScanner(scan);
-
// Iterate through all the results
- for (Result r : scanner) {
+ while (scanner.next() != null) {
+
}
scanner.close();
- ScanMetrics metrics = scan.getScanMetrics();
+ ScanMetrics metrics = scanner.getScanMetrics();
assertTrue("Metrics are null", metrics != null);
assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey));
final long actualMetricValue = metrics.getCounter(metricKey).get();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a49bc58a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index cfae7cb..9b96ff2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -42,6 +42,7 @@ import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
@@ -172,6 +173,11 @@ public class RegionAsTable implements Table {
public boolean renewLease() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public ScanMetrics getScanMetrics() {
+ throw new UnsupportedOperationException();
+ }
};
@Override
[14/14] hbase git commit: HBSE-15314 Allow more than one backing file
in bucketcache (Chunhui Shen)
Posted by sy...@apache.org.
HBSE-15314 Allow more than one backing file in bucketcache (Chunhui Shen)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e67eb6c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e67eb6c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e67eb6c4
Branch: refs/heads/hbase-12439
Commit: e67eb6c424d76ee259f5076c277454a73e3a2bf4
Parents: 6a6fff1
Author: Ramkrishna <ra...@intel.com>
Authored: Thu Mar 16 16:11:35 2017 +0530
Committer: Ramkrishna <ra...@intel.com>
Committed: Thu Mar 16 16:28:58 2017 +0530
----------------------------------------------------------------------
.../hbase/io/hfile/bucket/BucketCache.java | 9 +-
.../hbase/io/hfile/bucket/FileIOEngine.java | 184 +++++++++++++++----
.../hbase/io/hfile/bucket/TestFileIOEngine.java | 47 ++++-
3 files changed, 190 insertions(+), 50 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index 3e9c376..3c27f14 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -314,8 +314,13 @@ public class BucketCache implements BlockCache, HeapSize {
*/
private IOEngine getIOEngineFromName(String ioEngineName, long capacity)
throws IOException {
- if (ioEngineName.startsWith("file:")) {
- return new FileIOEngine(ioEngineName.substring(5), capacity);
+ if (ioEngineName.startsWith("file:") || ioEngineName.startsWith("files:")) {
+ // In order to make the usage simple, we only need the prefix 'files:' in
+ // document whether one or multiple file(s), but also support 'file:' for
+ // the compatibility
+ String[] filePaths = ioEngineName.substring(ioEngineName.indexOf(":") + 1)
+ .split(FileIOEngine.FILE_DELIMITER);
+ return new FileIOEngine(capacity, filePaths);
} else if (ioEngineName.startsWith("offheap")) {
return new ByteBufferIOEngine(capacity, true);
} else if (ioEngineName.startsWith("heap")) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
index aaf5cf9..7586d57 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/FileIOEngine.java
@@ -18,10 +18,12 @@
*/
package org.apache.hadoop.hbase.io.hfile.bucket;
+import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Arrays;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -39,38 +41,52 @@ import org.apache.hadoop.util.StringUtils;
@InterfaceAudience.Private
public class FileIOEngine implements IOEngine {
private static final Log LOG = LogFactory.getLog(FileIOEngine.class);
- private final RandomAccessFile raf;
- private final FileChannel fileChannel;
- private final String path;
- private long size;
-
- public FileIOEngine(String filePath, long fileSize) throws IOException {
- this.path = filePath;
- this.size = fileSize;
- try {
- raf = new RandomAccessFile(filePath, "rw");
- } catch (java.io.FileNotFoundException fex) {
- LOG.error("Can't create bucket cache file " + filePath, fex);
- throw fex;
- }
+ public static final String FILE_DELIMITER = ",";
+ private final String[] filePaths;
+ private final FileChannel[] fileChannels;
+ private final RandomAccessFile[] rafs;
- try {
- raf.setLength(fileSize);
- } catch (IOException ioex) {
- LOG.error("Can't extend bucket cache file; insufficient space for "
- + StringUtils.byteDesc(fileSize), ioex);
- raf.close();
- throw ioex;
- }
+ private final long sizePerFile;
+ private final long capacity;
- fileChannel = raf.getChannel();
- LOG.info("Allocating " + StringUtils.byteDesc(fileSize) + ", on the path:" + filePath);
+ private FileReadAccessor readAccessor = new FileReadAccessor();
+ private FileWriteAccessor writeAccessor = new FileWriteAccessor();
+
+ public FileIOEngine(long capacity, String... filePaths) throws IOException {
+ this.sizePerFile = capacity / filePaths.length;
+ this.capacity = this.sizePerFile * filePaths.length;
+ this.filePaths = filePaths;
+ this.fileChannels = new FileChannel[filePaths.length];
+ this.rafs = new RandomAccessFile[filePaths.length];
+ for (int i = 0; i < filePaths.length; i++) {
+ String filePath = filePaths[i];
+ try {
+ rafs[i] = new RandomAccessFile(filePath, "rw");
+ long totalSpace = new File(filePath).getTotalSpace();
+ if (totalSpace < sizePerFile) {
+ // The next setting length will throw exception,logging this message
+ // is just used for the detail reason of exception\uff0c
+ String msg = "Only " + StringUtils.byteDesc(totalSpace)
+ + " total space under " + filePath + ", not enough for requested "
+ + StringUtils.byteDesc(sizePerFile);
+ LOG.warn(msg);
+ }
+ rafs[i].setLength(sizePerFile);
+ fileChannels[i] = rafs[i].getChannel();
+ LOG.info("Allocating cache " + StringUtils.byteDesc(sizePerFile)
+ + ", on the path:" + filePath);
+ } catch (IOException fex) {
+ LOG.error("Failed allocating cache on " + filePath, fex);
+ shutdown();
+ throw fex;
+ }
+ }
}
@Override
public String toString() {
- return "ioengine=" + this.getClass().getSimpleName() + ", path=" + this.path +
- ", size=" + String.format("%,d", this.size);
+ return "ioengine=" + this.getClass().getSimpleName() + ", paths="
+ + Arrays.asList(filePaths) + ", capacity=" + String.format("%,d", this.capacity);
}
/**
@@ -94,7 +110,7 @@ public class FileIOEngine implements IOEngine {
public Cacheable read(long offset, int length, CacheableDeserializer<Cacheable> deserializer)
throws IOException {
ByteBuffer dstBuffer = ByteBuffer.allocate(length);
- fileChannel.read(dstBuffer, offset);
+ accessFile(readAccessor, dstBuffer, offset);
// The buffer created out of the fileChannel is formed by copying the data from the file
// Hence in this case there is no shared memory that we point to. Even if the BucketCache evicts
// this buffer from the file the data is already copied and there is no need to ensure that
@@ -114,7 +130,7 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void write(ByteBuffer srcBuffer, long offset) throws IOException {
- fileChannel.write(srcBuffer, offset);
+ accessFile(writeAccessor, srcBuffer, offset);
}
/**
@@ -123,7 +139,16 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void sync() throws IOException {
- fileChannel.force(true);
+ for (int i = 0; i < fileChannels.length; i++) {
+ try {
+ if (fileChannels[i] != null) {
+ fileChannels[i].force(true);
+ }
+ } catch (IOException ie) {
+ LOG.warn("Failed syncing data to " + this.filePaths[i]);
+ throw ie;
+ }
+ }
}
/**
@@ -131,15 +156,17 @@ public class FileIOEngine implements IOEngine {
*/
@Override
public void shutdown() {
- try {
- fileChannel.close();
- } catch (IOException ex) {
- LOG.error("Can't shutdown cleanly", ex);
- }
- try {
- raf.close();
- } catch (IOException ex) {
- LOG.error("Can't shutdown cleanly", ex);
+ for (int i = 0; i < filePaths.length; i++) {
+ try {
+ if (fileChannels[i] != null) {
+ fileChannels[i].close();
+ }
+ if (rafs[i] != null) {
+ rafs[i].close();
+ }
+ } catch (IOException ex) {
+ LOG.error("Failed closing " + filePaths[i] + " when shudown the IOEngine", ex);
+ }
}
}
@@ -147,7 +174,84 @@ public class FileIOEngine implements IOEngine {
public void write(ByteBuff srcBuffer, long offset) throws IOException {
// When caching block into BucketCache there will be single buffer backing for this HFileBlock.
assert srcBuffer.hasArray();
- fileChannel.write(
- ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(), srcBuffer.remaining()), offset);
+ write(ByteBuffer.wrap(srcBuffer.array(), srcBuffer.arrayOffset(),
+ srcBuffer.remaining()), offset);
+ }
+
+ private void accessFile(FileAccessor accessor, ByteBuffer buffer,
+ long globalOffset) throws IOException {
+ int startFileNum = getFileNum(globalOffset);
+ int remainingAccessDataLen = buffer.remaining();
+ int endFileNum = getFileNum(globalOffset + remainingAccessDataLen - 1);
+ int accessFileNum = startFileNum;
+ long accessOffset = getAbsoluteOffsetInFile(accessFileNum, globalOffset);
+ int bufLimit = buffer.limit();
+ while (true) {
+ FileChannel fileChannel = fileChannels[accessFileNum];
+ if (endFileNum > accessFileNum) {
+ // short the limit;
+ buffer.limit((int) (buffer.limit() - remainingAccessDataLen
+ + sizePerFile - accessOffset));
+ }
+ int accessLen = accessor.access(fileChannel, buffer, accessOffset);
+ // recover the limit
+ buffer.limit(bufLimit);
+ if (accessLen < remainingAccessDataLen) {
+ remainingAccessDataLen -= accessLen;
+ accessFileNum++;
+ accessOffset = 0;
+ } else {
+ break;
+ }
+ if (accessFileNum >= fileChannels.length) {
+ throw new IOException("Required data len "
+ + StringUtils.byteDesc(buffer.remaining())
+ + " exceed the engine's capacity " + StringUtils.byteDesc(capacity)
+ + " where offset=" + globalOffset);
+ }
+ }
+ }
+
+ /**
+ * Get the absolute offset in given file with the relative global offset.
+ * @param fileNum
+ * @param globalOffset
+ * @return the absolute offset
+ */
+ private long getAbsoluteOffsetInFile(int fileNum, long globalOffset) {
+ return globalOffset - fileNum * sizePerFile;
+ }
+
+ private int getFileNum(long offset) {
+ if (offset < 0) {
+ throw new IllegalArgumentException("Unexpected offset " + offset);
+ }
+ int fileNum = (int) (offset / sizePerFile);
+ if (fileNum >= fileChannels.length) {
+ throw new RuntimeException("Not expected offset " + offset
+ + " where capacity=" + capacity);
+ }
+ return fileNum;
+ }
+
+ private static interface FileAccessor {
+ int access(FileChannel fileChannel, ByteBuffer byteBuffer, long accessOffset)
+ throws IOException;
+ }
+
+ private static class FileReadAccessor implements FileAccessor {
+ @Override
+ public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ long accessOffset) throws IOException {
+ return fileChannel.read(byteBuffer, accessOffset);
+ }
+ }
+
+ private static class FileWriteAccessor implements FileAccessor {
+ @Override
+ public int access(FileChannel fileChannel, ByteBuffer byteBuffer,
+ long accessOffset) throws IOException {
+ return fileChannel.write(byteBuffer, accessOffset);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/e67eb6c4/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
index 93f4cf7..d1f3dfe 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/io/hfile/bucket/TestFileIOEngine.java
@@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.io.hfile.bucket.TestByteBufferIOEngine.BufferGrabbingDeserializer;
import org.apache.hadoop.hbase.nio.ByteBuff;
@@ -38,13 +40,39 @@ import org.junit.experimental.categories.Category;
public class TestFileIOEngine {
@Test
public void testFileIOEngine() throws IOException {
- int size = 2 * 1024 * 1024; // 2 MB
- String filePath = "testFileIOEngine";
+ long totalCapacity = 6 * 1024 * 1024; // 6 MB
+ String[] filePaths = { "testFileIOEngine1", "testFileIOEngine2",
+ "testFileIOEngine3" };
+ long sizePerFile = totalCapacity / filePaths.length; // 2 MB per File
+ List<Long> boundaryStartPositions = new ArrayList<Long>();
+ boundaryStartPositions.add(0L);
+ for (int i = 1; i < filePaths.length; i++) {
+ boundaryStartPositions.add(sizePerFile * i - 1);
+ boundaryStartPositions.add(sizePerFile * i);
+ boundaryStartPositions.add(sizePerFile * i + 1);
+ }
+ List<Long> boundaryStopPositions = new ArrayList<Long>();
+ for (int i = 1; i < filePaths.length; i++) {
+ boundaryStopPositions.add(sizePerFile * i - 1);
+ boundaryStopPositions.add(sizePerFile * i);
+ boundaryStopPositions.add(sizePerFile * i + 1);
+ }
+ boundaryStopPositions.add(sizePerFile * filePaths.length - 1);
+ FileIOEngine fileIOEngine = new FileIOEngine(totalCapacity, filePaths);
try {
- FileIOEngine fileIOEngine = new FileIOEngine(filePath, size);
- for (int i = 0; i < 50; i++) {
+ for (int i = 0; i < 500; i++) {
int len = (int) Math.floor(Math.random() * 100);
- long offset = (long) Math.floor(Math.random() * size % (size - len));
+ long offset = (long) Math.floor(Math.random() * totalCapacity % (totalCapacity - len));
+ if (i < boundaryStartPositions.size()) {
+ // make the boundary start positon
+ offset = boundaryStartPositions.get(i);
+ } else if ((i - boundaryStartPositions.size()) < boundaryStopPositions.size()) {
+ // make the boundary stop positon
+ offset = boundaryStopPositions.get(i - boundaryStartPositions.size()) - len + 1;
+ } else if (i % 2 == 0) {
+ // make the cross-files block writing/reading
+ offset = Math.max(1, i % filePaths.length) * sizePerFile - len / 2;
+ }
byte[] data1 = new byte[len];
for (int j = 0; j < data1.length; ++j) {
data1[j] = (byte) (Math.random() * 255);
@@ -58,9 +86,12 @@ public class TestFileIOEngine {
}
}
} finally {
- File file = new File(filePath);
- if (file.exists()) {
- file.delete();
+ fileIOEngine.shutdown();
+ for (String filePath : filePaths) {
+ File file = new File(filePath);
+ if (file.exists()) {
+ file.delete();
+ }
}
}
[03/14] hbase git commit: HBASE-15597 Clean up configuration keys
used in hbase-spark module (Yi Liang)
Posted by sy...@apache.org.
HBASE-15597 Clean up configuration keys used in hbase-spark module (Yi Liang)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/35d7a0cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/35d7a0cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/35d7a0cd
Branch: refs/heads/hbase-12439
Commit: 35d7a0cd0798cabe7df5766fcc993512eca6c92e
Parents: fee67bc
Author: Jerry He <je...@apache.org>
Authored: Mon Mar 13 12:02:07 2017 -0700
Committer: Jerry He <je...@apache.org>
Committed: Mon Mar 13 12:02:07 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/spark/DefaultSource.scala | 28 ++++-----
.../hbase/spark/HBaseConnectionCache.scala | 2 +-
.../spark/datasources/HBaseSparkConf.scala | 62 ++++++++++++--------
.../hadoop/hbase/spark/DefaultSourceSuite.scala | 16 ++---
.../spark/DynamicLogicExpressionSuite.scala | 2 +-
.../hadoop/hbase/spark/HBaseTestSource.scala | 13 ++--
.../hbase/spark/PartitionFilterSuite.scala | 6 +-
7 files changed, 69 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
index a8b2ab8..b2b646a 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/DefaultSource.scala
@@ -97,36 +97,36 @@ case class HBaseRelation (
)(@transient val sqlContext: SQLContext)
extends BaseRelation with PrunedFilteredScan with InsertableRelation with Logging {
val timestamp = parameters.get(HBaseSparkConf.TIMESTAMP).map(_.toLong)
- val minTimestamp = parameters.get(HBaseSparkConf.MIN_TIMESTAMP).map(_.toLong)
- val maxTimestamp = parameters.get(HBaseSparkConf.MAX_TIMESTAMP).map(_.toLong)
+ val minTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_START).map(_.toLong)
+ val maxTimestamp = parameters.get(HBaseSparkConf.TIMERANGE_END).map(_.toLong)
val maxVersions = parameters.get(HBaseSparkConf.MAX_VERSIONS).map(_.toInt)
- val encoderClsName = parameters.get(HBaseSparkConf.ENCODER).getOrElse(HBaseSparkConf.defaultEncoder)
+ val encoderClsName = parameters.get(HBaseSparkConf.QUERY_ENCODER).getOrElse(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
@transient val encoder = JavaBytesEncoder.create(encoderClsName)
val catalog = HBaseTableCatalog(parameters)
def tableName = catalog.name
- val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_RESOURCES_LOCATIONS, "")
- val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASE_CONTEXT).map(_.toBoolean).getOrElse(true)
- val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER)
- .map(_.toBoolean).getOrElse(true)
+ val configResources = parameters.getOrElse(HBaseSparkConf.HBASE_CONFIG_LOCATION, "")
+ val useHBaseContext = parameters.get(HBaseSparkConf.USE_HBASECONTEXT).map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_USE_HBASECONTEXT)
+ val usePushDownColumnFilter = parameters.get(HBaseSparkConf.PUSHDOWN_COLUMN_FILTER)
+ .map(_.toBoolean).getOrElse(HBaseSparkConf.DEFAULT_PUSHDOWN_COLUMN_FILTER)
// The user supplied per table parameter will overwrite global ones in SparkConf
- val blockCacheEnable = parameters.get(HBaseSparkConf.BLOCK_CACHE_ENABLE).map(_.toBoolean)
+ val blockCacheEnable = parameters.get(HBaseSparkConf.QUERY_CACHEBLOCKS).map(_.toBoolean)
.getOrElse(
sqlContext.sparkContext.getConf.getBoolean(
- HBaseSparkConf.BLOCK_CACHE_ENABLE, HBaseSparkConf.defaultBlockCacheEnable))
- val cacheSize = parameters.get(HBaseSparkConf.CACHE_SIZE).map(_.toInt)
+ HBaseSparkConf.QUERY_CACHEBLOCKS, HBaseSparkConf.DEFAULT_QUERY_CACHEBLOCKS))
+ val cacheSize = parameters.get(HBaseSparkConf.QUERY_CACHEDROWS).map(_.toInt)
.getOrElse(
sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.CACHE_SIZE, HBaseSparkConf.defaultCachingSize))
- val batchNum = parameters.get(HBaseSparkConf.BATCH_NUM).map(_.toInt)
+ HBaseSparkConf.QUERY_CACHEDROWS, -1))
+ val batchNum = parameters.get(HBaseSparkConf.QUERY_BATCHSIZE).map(_.toInt)
.getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.BATCH_NUM, HBaseSparkConf.defaultBatchNum))
+ HBaseSparkConf.QUERY_BATCHSIZE, -1))
val bulkGetSize = parameters.get(HBaseSparkConf.BULKGET_SIZE).map(_.toInt)
.getOrElse(sqlContext.sparkContext.getConf.getInt(
- HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.defaultBulkGetSize))
+ HBaseSparkConf.BULKGET_SIZE, HBaseSparkConf.DEFAULT_BULKGET_SIZE))
//create or get latest HBaseContext
val hbaseContext:HBaseContext = if (useHBaseContext) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index fb5833e..2858da8 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -37,7 +37,7 @@ private[spark] object HBaseConnectionCache extends Logging {
val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
// in milliseconds
- private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
+ private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.DEFAULT_CONNECTION_CLOSE_DELAY
private var timeout = DEFAULT_TIME_OUT
private var closed: Boolean = false
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
index 0f20d1d..8c1cb35 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseSparkConf.scala
@@ -20,35 +20,45 @@ package org.apache.hadoop.hbase.spark.datasources
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
+/**
+ * This is the hbase configuration. User can either set them in SparkConf, which
+ * will take effect globally, or configure it per table, which will overwrite the value
+ * set in SparkConf. If not set, the default value will take effect.
+ */
@InterfaceAudience.Public
@InterfaceStability.Evolving
object HBaseSparkConf{
- // This is the hbase configuration. User can either set them in SparkConf, which
- // will take effect globally, or configure it per table, which will overwrite the value
- // set in SparkConf. If not setted, the default value will take effect.
- val BLOCK_CACHE_ENABLE = "spark.hbase.blockcache.enable"
- // default block cache is set to true by default following hbase convention, but note that
- // this potentially may slow down the system
- val defaultBlockCacheEnable = true
- val CACHE_SIZE = "spark.hbase.cacheSize"
- val defaultCachingSize = 1000
- val BATCH_NUM = "spark.hbase.batchNum"
- val defaultBatchNum = 1000
- val BULKGET_SIZE = "spark.hbase.bulkGetSize"
- val defaultBulkGetSize = 1000
-
- val HBASE_CONFIG_RESOURCES_LOCATIONS = "hbase.config.resources"
- val USE_HBASE_CONTEXT = "hbase.use.hbase.context"
- val PUSH_DOWN_COLUMN_FILTER = "hbase.pushdown.column.filter"
- val defaultPushDownColumnFilter = true
-
+ /** Set to false to disable server-side caching of blocks for this scan,
+ * false by default, since full table scans generate too much BC churn.
+ */
+ val QUERY_CACHEBLOCKS = "hbase.spark.query.cacheblocks"
+ val DEFAULT_QUERY_CACHEBLOCKS = false
+ /** The number of rows for caching that will be passed to scan. */
+ val QUERY_CACHEDROWS = "hbase.spark.query.cachedrows"
+ /** Set the maximum number of values to return for each call to next() in scan. */
+ val QUERY_BATCHSIZE = "hbase.spark.query.batchsize"
+ /** The number of BulkGets send to HBase. */
+ val BULKGET_SIZE = "hbase.spark.bulkget.size"
+ val DEFAULT_BULKGET_SIZE = 1000
+ /** Set to specify the location of hbase configuration file. */
+ val HBASE_CONFIG_LOCATION = "hbase.spark.config.location"
+ /** Set to specify whether create or use latest cached HBaseContext*/
+ val USE_HBASECONTEXT = "hbase.spark.use.hbasecontext"
+ val DEFAULT_USE_HBASECONTEXT = true
+ /** Pushdown the filter to data source engine to increase the performance of queries. */
+ val PUSHDOWN_COLUMN_FILTER = "hbase.spark.pushdown.columnfilter"
+ val DEFAULT_PUSHDOWN_COLUMN_FILTER= true
+ /** Class name of the encoder, which encode data types from Spark to HBase bytes. */
+ val QUERY_ENCODER = "hbase.spark.query.encoder"
+ val DEFAULT_QUERY_ENCODER = classOf[NaiveEncoder].getCanonicalName
+ /** The timestamp used to filter columns with a specific timestamp. */
val TIMESTAMP = "hbase.spark.query.timestamp"
- val MIN_TIMESTAMP = "hbase.spark.query.minTimestamp"
- val MAX_TIMESTAMP = "hbase.spark.query.maxTimestamp"
+ /** The starting timestamp used to filter columns with a specific range of versions. */
+ val TIMERANGE_START = "hbase.spark.query.timerange.start"
+ /** The ending timestamp used to filter columns with a specific range of versions. */
+ val TIMERANGE_END = "hbase.spark.query.timerange.end"
+ /** The maximum number of version to return. */
val MAX_VERSIONS = "hbase.spark.query.maxVersions"
- val ENCODER = "hbase.spark.query.encoder"
- val defaultEncoder = classOf[NaiveEncoder].getCanonicalName
-
- // in milliseconds
- val connectionCloseDelay = 10 * 60 * 1000
+ /** Delayed time to close hbase-spark connection when no reference to this connection, in milliseconds. */
+ val DEFAULT_CONNECTION_CLOSE_DELAY = 10 * 60 * 1000
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
index 7b8b844..3bce041 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DefaultSourceSuite.scala
@@ -116,9 +116,9 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
TEST_UTIL.createTable(TableName.valueOf(t2TableName), Bytes.toBytes(columnFamily))
logInfo(" - created table")
val sparkConf = new SparkConf
- sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
- sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
- sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+ sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
@@ -791,7 +791,7 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
|}""".stripMargin
df = sqlContext.load("org.apache.hadoop.hbase.spark",
Map(HBaseTableCatalog.tableCatalog->catalog,
- HBaseSparkConf.PUSH_DOWN_COLUMN_FILTER -> "false"))
+ HBaseSparkConf.PUSHDOWN_COLUMN_FILTER -> "false"))
df.registerTempTable("hbaseNoPushDownTmp")
@@ -913,8 +913,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// Test Getting old stuff -- Full Scan, TimeRange
val oldRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
- HBaseSparkConf.MAX_TIMESTAMP -> (oldMs + 100).toString))
+ .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+ HBaseSparkConf.TIMERANGE_END -> (oldMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(oldRange.count() == 101)
@@ -924,8 +924,8 @@ BeforeAndAfterEach with BeforeAndAfterAll with Logging {
// Test Getting middle stuff -- Full Scan, TimeRange
val middleRange = sqlContext.read
- .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.MIN_TIMESTAMP -> "0",
- HBaseSparkConf.MAX_TIMESTAMP -> (startMs + 100).toString))
+ .options(Map(HBaseTableCatalog.tableCatalog -> writeCatalog, HBaseSparkConf.TIMERANGE_START -> "0",
+ HBaseSparkConf.TIMERANGE_END -> (startMs + 100).toString))
.format("org.apache.hadoop.hbase.spark")
.load()
assert(middleRange.count() == 256)
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
index b9c15ce..bc833e8 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/DynamicLogicExpressionSuite.scala
@@ -28,7 +28,7 @@ import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach, FunSuite}
class DynamicLogicExpressionSuite extends FunSuite with
BeforeAndAfterEach with BeforeAndAfterAll with Logging {
- val encoder = JavaBytesEncoder.create(HBaseSparkConf.defaultEncoder)
+ val encoder = JavaBytesEncoder.create(HBaseSparkConf.DEFAULT_QUERY_ENCODER)
test("Basic And Test") {
val leftLogic = new LessThanLogicExpression("Col1", 0)
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
index 83465d9..ccb4625 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseTestSource.scala
@@ -49,13 +49,12 @@ case class DummyScan(
override def buildScan(): RDD[Row] = sqlContext.sparkContext.parallelize(0 until rowNum)
.map(Row(_))
.map{ x =>
- if (sparkConf.getInt(HBaseSparkConf.BATCH_NUM,
- HBaseSparkConf.defaultBatchNum) != batchNum ||
- sparkConf.getInt(HBaseSparkConf.CACHE_SIZE,
- HBaseSparkConf.defaultCachingSize) != cacheSize ||
- sparkConf.getBoolean(HBaseSparkConf.BLOCK_CACHE_ENABLE,
- HBaseSparkConf.defaultBlockCacheEnable)
- != blockCachingEnable) {
+ if (sparkConf.getInt(HBaseSparkConf.QUERY_BATCHSIZE,
+ -1) != batchNum ||
+ sparkConf.getInt(HBaseSparkConf.QUERY_CACHEDROWS,
+ -1) != cacheSize ||
+ sparkConf.getBoolean(HBaseSparkConf.QUERY_CACHEBLOCKS,
+ false) != blockCachingEnable) {
throw new Exception("HBase Spark configuration cannot be set properly")
}
x
http://git-wip-us.apache.org/repos/asf/hbase/blob/35d7a0cd/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
index d33ced9..f47a319 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/PartitionFilterSuite.scala
@@ -69,9 +69,9 @@ class PartitionFilterSuite extends FunSuite with
TEST_UTIL.startMiniCluster
val sparkConf = new SparkConf
- sparkConf.set(HBaseSparkConf.BLOCK_CACHE_ENABLE, "true")
- sparkConf.set(HBaseSparkConf.BATCH_NUM, "100")
- sparkConf.set(HBaseSparkConf.CACHE_SIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEBLOCKS, "true")
+ sparkConf.set(HBaseSparkConf.QUERY_BATCHSIZE, "100")
+ sparkConf.set(HBaseSparkConf.QUERY_CACHEDROWS, "100")
sc = new SparkContext("local", "test", sparkConf)
new HBaseContext(sc, TEST_UTIL.getConfiguration)
[11/14] hbase git commit: HBASE-17740 Correct the semantic of batch
and partial for async client
Posted by sy...@apache.org.
HBASE-17740 Correct the semantic of batch and partial for async client
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1849e8a5
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1849e8a5
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1849e8a5
Branch: refs/heads/hbase-12439
Commit: 1849e8a5a77373b5fb8e354c3f20214a80eb8c1a
Parents: 0ecb678
Author: zhangduo <zh...@apache.org>
Authored: Wed Mar 15 18:26:51 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Mar 16 09:44:23 2017 +0800
----------------------------------------------------------------------
.../client/AllowPartialScanResultCache.java | 31 ++-
.../hadoop/hbase/client/AsyncClientScanner.java | 4 +-
.../hbase/client/BatchScanResultCache.java | 142 +++++++++++
.../hadoop/hbase/client/ClientScanner.java | 253 +------------------
.../hadoop/hbase/client/ConnectionUtils.java | 14 +
.../org/apache/hadoop/hbase/client/Result.java | 72 +++---
.../client/TestAllowPartialScanResultCache.java | 33 ++-
.../hbase/client/TestBatchScanResultCache.java | 113 +++++++++
.../TestCompleteResultScanResultCache.java | 5 +-
.../client/TestRawAsyncTablePartialScan.java | 119 +++++++++
10 files changed, 471 insertions(+), 315 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
index caecfd4..82f1ea0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AllowPartialScanResultCache.java
@@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
import java.io.IOException;
import java.util.Arrays;
@@ -36,10 +38,6 @@ class AllowPartialScanResultCache implements ScanResultCache {
// beginning of a row when retry.
private Cell lastCell;
- private Result filterCells(Result result) {
- return lastCell == null ? result : ConnectionUtils.filterCells(result, lastCell);
- }
-
private void updateLastCell(Result result) {
lastCell = result.rawCells()[result.rawCells().length - 1];
}
@@ -49,22 +47,23 @@ class AllowPartialScanResultCache implements ScanResultCache {
if (results.length == 0) {
return EMPTY_RESULT_ARRAY;
}
- Result first = filterCells(results[0]);
- if (results.length == 1) {
- if (first == null) {
- // do not update last cell if we filter out all cells
- return EMPTY_RESULT_ARRAY;
+ int i;
+ for (i = 0; i < results.length; i++) {
+ Result r = filterCells(results[i], lastCell);
+ if (r != null) {
+ results[i] = r;
+ break;
}
- updateLastCell(results[0]);
- results[0] = first;
- return results;
+ }
+ if (i == results.length) {
+ return EMPTY_RESULT_ARRAY;
}
updateLastCell(results[results.length - 1]);
- if (first == null) {
- return Arrays.copyOfRange(results, 1, results.length);
+ if (i > 0) {
+ return Arrays.copyOfRange(results, i, results.length);
+ } else {
+ return results;
}
- results[0] = first;
- return results;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index 2215d36..fa7aa81 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
import java.io.IOException;
@@ -86,8 +87,7 @@ class AsyncClientScanner {
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
- this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
- ? new AllowPartialScanResultCache() : new CompleteScanResultCache();
+ this.resultCache = createScanResultCache(scan);
}
private static final class OpenScannerResponse {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
new file mode 100644
index 0000000..9ab959b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BatchScanResultCache.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.filterCells;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * A scan result cache for batched scan, i.e,
+ * {@code scan.getBatch() > 0 && !scan.getAllowPartialResults()}.
+ * <p>
+ * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user. setBatch
+ * doesn't mean setAllowPartialResult(true).
+ */
+@InterfaceAudience.Private
+public class BatchScanResultCache implements ScanResultCache {
+
+ private final int batch;
+
+ // used to filter out the cells that already returned to user as we always start from the
+ // beginning of a row when retry.
+ private Cell lastCell;
+
+ private final Deque<Result> partialResults = new ArrayDeque<>();
+
+ private int numCellsOfPartialResults;
+
+ public BatchScanResultCache(int batch) {
+ this.batch = batch;
+ }
+
+ private void updateLastCell(Result result) {
+ lastCell = result.rawCells()[result.rawCells().length - 1];
+ }
+
+ private Result createCompletedResult() throws IOException {
+ Result result = Result.createCompleteResult(partialResults);
+ partialResults.clear();
+ numCellsOfPartialResults = 0;
+ return result;
+ }
+
+ // Add new result to the partial list and return a batched Result if caching size exceed batching
+ // limit. As the RS will also respect the scan.getBatch, we can make sure that we will get only
+ // one Result back at most(or null, which means we do not have enough cells).
+ private Result regroupResults(Result result) {
+ partialResults.addLast(result);
+ numCellsOfPartialResults += result.size();
+ if (numCellsOfPartialResults < batch) {
+ return null;
+ }
+ Cell[] cells = new Cell[batch];
+ int cellCount = 0;
+ boolean stale = false;
+ for (;;) {
+ Result r = partialResults.pollFirst();
+ stale = stale || r.isStale();
+ int newCellCount = cellCount + r.size();
+ if (newCellCount > batch) {
+ // We have more cells than expected, so split the current result
+ int len = batch - cellCount;
+ System.arraycopy(r.rawCells(), 0, cells, cellCount, len);
+ Cell[] remainingCells = new Cell[r.size() - len];
+ System.arraycopy(r.rawCells(), len, remainingCells, 0, r.size() - len);
+ partialResults.addFirst(
+ Result.create(remainingCells, r.getExists(), r.isStale(), r.mayHaveMoreCellsInRow()));
+ break;
+ }
+ System.arraycopy(r.rawCells(), 0, cells, cellCount, r.size());
+ if (newCellCount == batch) {
+ break;
+ }
+ cellCount = newCellCount;
+ }
+ numCellsOfPartialResults -= batch;
+ return Result.create(cells, null, stale,
+ result.mayHaveMoreCellsInRow() || !partialResults.isEmpty());
+ }
+
+ @Override
+ public Result[] addAndGet(Result[] results, boolean isHeartbeatMessage) throws IOException {
+ if (results.length == 0) {
+ if (!partialResults.isEmpty() && !isHeartbeatMessage) {
+ return new Result[] { createCompletedResult() };
+ }
+ return EMPTY_RESULT_ARRAY;
+ }
+ List<Result> regroupedResults = new ArrayList<>();
+ for (Result result : results) {
+ result = filterCells(result, lastCell);
+ if (result == null) {
+ continue;
+ }
+ // check if we have a row change
+ if (!partialResults.isEmpty() &&
+ !Bytes.equals(partialResults.peek().getRow(), result.getRow())) {
+ regroupedResults.add(createCompletedResult());
+ }
+ Result regroupedResult = regroupResults(result);
+ if (regroupedResult != null) {
+ regroupedResults.add(regroupedResult);
+ // only update last cell when we actually return it to user.
+ updateLastCell(regroupedResult);
+ }
+ if (!result.mayHaveMoreCellsInRow() && !partialResults.isEmpty()) {
+ // We are done for this row
+ regroupedResults.add(createCompletedResult());
+ }
+ }
+ return regroupedResults.toArray(new Result[0]);
+ }
+
+ @Override
+ public void clear() {
+ partialResults.clear();
+ numCellsOfPartialResults = 0;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index bd3d4ef..a8b029f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -18,16 +18,15 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
import static org.apache.hadoop.hbase.client.ConnectionUtils.numberOfIndividualRows;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.concurrent.ExecutorService;
@@ -35,8 +34,6 @@ import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -69,24 +66,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
protected HRegionInfo currentRegion = null;
protected ScannerCallableWithReplicas callable = null;
protected Queue<Result> cache;
- /**
- * A list of partial results that have been returned from the server. This list should only
- * contain results if this scanner does not have enough partial results to form the complete
- * result.
- */
- protected int partialResultsCellSizes = 0;
- protected final LinkedList<Result> partialResults = new LinkedList<>();
-
- /**
- * The row for which we are accumulating partial Results (i.e. the row of the Results stored
- * inside partialResults). Changes to partialResultsRow and partialResults are kept in sync via
- * the methods {@link #regroupResults(Result)} and {@link #clearPartialResults()}
- */
- protected byte[] partialResultsRow = null;
- /**
- * The last cell from a not full Row which is added to cache
- */
- protected Cell lastCellLoadedToCache = null;
+ private final ScanResultCache scanResultCache;
protected final int caching;
protected long lastNext;
// Keep lastResult returned successfully in case we have to reset scanner.
@@ -159,6 +139,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
this.rpcControllerFactory = controllerFactory;
this.conf = conf;
+
+ this.scanResultCache = createScanResultCache(scan);
initCache();
}
@@ -356,14 +338,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
private void closeScannerIfExhausted(boolean exhausted) throws IOException {
if (exhausted) {
- if (!partialResults.isEmpty()) {
- // XXX: continue if there are partial results. But in fact server should not set
- // hasMoreResults to false if there are partial results.
- LOG.warn("Server tells us there is no more results for this region but we still have" +
- " partialResults, this should not happen, retry on the current scanner anyway");
- } else {
- closeScanner();
- }
+ closeScanner();
}
}
@@ -371,7 +346,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
MutableBoolean retryAfterOutOfOrderException, int retriesLeft) throws DoNotRetryIOException {
// An exception was thrown which makes any partial results that we were collecting
// invalid. The scanner will need to be reset to the beginning of a row.
- clearPartialResults();
+ scanResultCache.clear();
// Unfortunately, DNRIOE is used in two different semantics.
// (1) The first is to close the client scanner and bubble up the exception all the way
@@ -465,7 +440,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
if (callable.switchedToADifferentReplica()) {
// Any accumulated partial results are no longer valid since the callable will
// openScanner with the correct startkey and we must pick up from there
- clearPartialResults();
+ scanResultCache.clear();
this.currentRegion = callable.getHRegionInfo();
}
retryAfterOutOfOrderException.setValue(true);
@@ -485,29 +460,19 @@ public abstract class ClientScanner extends AbstractClientScanner {
// Groom the array of Results that we received back from the server before adding that
// Results to the scanner's cache. If partial results are not allowed to be seen by the
// caller, all book keeping will be performed within this method.
- List<Result> resultsToAddToCache =
- getResultsToAddToCache(values, callable.isHeartbeatMessage());
- if (!resultsToAddToCache.isEmpty()) {
+ Result[] resultsToAddToCache = scanResultCache.addAndGet(values, callable.isHeartbeatMessage());
+ if (resultsToAddToCache.length > 0) {
for (Result rs : resultsToAddToCache) {
- rs = filterLoadedCell(rs);
- if (rs == null) {
- continue;
- }
-
cache.add(rs);
long estimatedHeapSizeOfResult = calcEstimatedSize(rs);
countdown--;
remainingResultSize -= estimatedHeapSizeOfResult;
addEstimatedSize(estimatedHeapSizeOfResult);
this.lastResult = rs;
- if (this.lastResult.mayHaveMoreCellsInRow()) {
- updateLastCellLoadedToCache(this.lastResult);
- } else {
- this.lastCellLoadedToCache = null;
- }
}
- if (scan.getLimit() > 0 && !resultsToAddToCache.isEmpty()) {
- int newLimit = scan.getLimit() - numberOfIndividualRows(resultsToAddToCache);
+ if (scan.getLimit() > 0) {
+ int newLimit =
+ scan.getLimit() - numberOfIndividualRows(Arrays.asList(resultsToAddToCache));
assert newLimit >= 0;
scan.setLimit(newLimit);
}
@@ -550,13 +515,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
}
// we are done with the current region
if (regionExhausted) {
- if (!partialResults.isEmpty()) {
- // XXX: continue if there are partial results. But in fact server should not set
- // hasMoreResults to false if there are partial results.
- LOG.warn("Server tells us there is no more results for this region but we still have" +
- " partialResults, this should not happen, retry on the current scanner anyway");
- continue;
- }
if (!moveToNextRegion()) {
break;
}
@@ -573,142 +531,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
return cache != null ? cache.size() : 0;
}
- /**
- * This method ensures all of our book keeping regarding partial results is kept up to date. This
- * method should be called once we know that the results we received back from the RPC request do
- * not contain errors. We return a list of results that should be added to the cache. In general,
- * this list will contain all NON-partial results from the input array (unless the client has
- * specified that they are okay with receiving partial results)
- * @param resultsFromServer The array of {@link Result}s returned from the server
- * @param heartbeatMessage Flag indicating whether or not the response received from the server
- * represented a complete response, or a heartbeat message that was sent to keep the
- * client-server connection alive
- * @return the list of results that should be added to the cache.
- * @throws IOException
- */
- protected List<Result> getResultsToAddToCache(Result[] resultsFromServer,
- boolean heartbeatMessage) throws IOException {
- int resultSize = resultsFromServer != null ? resultsFromServer.length : 0;
- List<Result> resultsToAddToCache = new ArrayList<>(resultSize);
-
- // If the caller has indicated in their scan that they are okay with seeing partial results,
- // then simply add all results to the list. Note allowPartial and setBatch are not same, we can
- // return here if allow partials and we will handle batching later.
- if (scan.getAllowPartialResults()) {
- addResultsToList(resultsToAddToCache, resultsFromServer, 0,
- (null == resultsFromServer ? 0 : resultsFromServer.length));
- return resultsToAddToCache;
- }
-
- // If no results were returned it indicates that either we have the all the partial results
- // necessary to construct the complete result or the server had to send a heartbeat message
- // to the client to keep the client-server connection alive
- if (resultsFromServer == null || resultsFromServer.length == 0) {
- // If this response was an empty heartbeat message, then we have not exhausted the region
- // and thus there may be more partials server side that still need to be added to the partial
- // list before we form the complete Result
- if (!partialResults.isEmpty() && !heartbeatMessage) {
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- clearPartialResults();
- }
-
- return resultsToAddToCache;
- }
-
- for(Result result : resultsFromServer) {
- if (partialResultsRow != null && Bytes.compareTo(result.getRow(), partialResultsRow) != 0) {
- // We have a new row, complete the previous row.
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- clearPartialResults();
- }
- Result res = regroupResults(result);
- if (res != null) {
- resultsToAddToCache.add(res);
- }
- if (!result.mayHaveMoreCellsInRow()) {
- // We are done for this row
- if (partialResultsCellSizes > 0) {
- resultsToAddToCache.add(Result.createCompleteResult(partialResults));
- }
- clearPartialResults();
- }
- }
-
-
- return resultsToAddToCache;
- }
-
- /**
- * Add new result to the partial list and return a batched Result if caching size exceed
- * batching limit.
- * If user setBatch(5) and rpc returns 3+5+5+5+3 cells, we should return 5+5+5+5+1 to user.
- * setBatch doesn't mean setAllowPartialResult(true)
- * @param result The result that we want to add to our list of partial Results
- * @return the result if we have batch limit and there is one Result can be returned to user, or
- * null if we have not.
- * @throws IOException
- */
- private Result regroupResults(final Result result) throws IOException {
- partialResultsRow = result.getRow();
- partialResults.add(result);
- partialResultsCellSizes += result.size();
- if (scan.getBatch() > 0 && partialResultsCellSizes >= scan.getBatch()) {
- Cell[] cells = new Cell[scan.getBatch()];
- int count = 0;
- boolean stale = false;
- while (count < scan.getBatch()) {
- Result res = partialResults.poll();
- stale = stale || res.isStale();
- if (res.size() + count <= scan.getBatch()) {
- System.arraycopy(res.rawCells(), 0, cells, count, res.size());
- count += res.size();
- } else {
- int len = scan.getBatch() - count;
- System.arraycopy(res.rawCells(), 0, cells, count, len);
- Cell[] remainingCells = new Cell[res.size() - len];
- System.arraycopy(res.rawCells(), len, remainingCells, 0, res.size() - len);
- Result remainingRes = Result.create(remainingCells, res.getExists(), res.isStale(),
- res.mayHaveMoreCellsInRow());
- partialResults.addFirst(remainingRes);
- count = scan.getBatch();
- }
- }
- partialResultsCellSizes -= scan.getBatch();
- if (partialResultsCellSizes == 0) {
- // We have nothing in partialResults, clear the flags to prevent returning empty Result
- // when next result belongs to the next row.
- clearPartialResults();
- }
- return Result.create(cells, null, stale,
- partialResultsCellSizes > 0 || result.mayHaveMoreCellsInRow());
- }
- return null;
- }
-
- /**
- * Convenience method for clearing the list of partials and resetting the partialResultsRow.
- */
- private void clearPartialResults() {
- partialResults.clear();
- partialResultsCellSizes = 0;
- partialResultsRow = null;
- }
-
- /**
- * Helper method for adding results between the indices [start, end) to the outputList
- * @param outputList the list that results will be added to
- * @param inputArray the array that results are taken from
- * @param start beginning index (inclusive)
- * @param end ending index (exclusive)
- */
- private void addResultsToList(List<Result> outputList, Result[] inputArray, int start, int end) {
- if (inputArray == null || start < 0 || end > inputArray.length) return;
-
- for (int i = start; i < end; i++) {
- outputList.add(inputArray[i]);
- }
- }
-
@Override
public void close() {
if (!scanMetricsPublished) writeScanMetrics();
@@ -749,57 +571,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
return false;
}
- protected void updateLastCellLoadedToCache(Result result) {
- if (result.rawCells().length == 0) {
- return;
- }
- this.lastCellLoadedToCache = result.rawCells()[result.rawCells().length - 1];
- }
-
- /**
- * Compare two Cells considering reversed scanner. ReversedScanner only reverses rows, not
- * columns.
- */
- private int compare(Cell a, Cell b) {
- CellComparator comparator = currentRegion != null && currentRegion.isMetaRegion()
- ? CellComparator.META_COMPARATOR : CellComparator.COMPARATOR;
- int r = comparator.compareRows(a, b);
- if (r != 0) {
- return this.scan.isReversed() ? -r : r;
- }
- return CellComparator.compareWithoutRow(a, b);
- }
-
- private Result filterLoadedCell(Result result) {
- // we only filter result when last result is partial
- // so lastCellLoadedToCache and result should have same row key.
- // However, if 1) read some cells; 1.1) delete this row at the same time 2) move region;
- // 3) read more cell. lastCellLoadedToCache and result will be not at same row.
- if (lastCellLoadedToCache == null || result.rawCells().length == 0) {
- return result;
- }
- if (compare(this.lastCellLoadedToCache, result.rawCells()[0]) < 0) {
- // The first cell of this result is larger than the last cell of loadcache.
- // If user do not allow partial result, it must be true.
- return result;
- }
- if (compare(this.lastCellLoadedToCache, result.rawCells()[result.rawCells().length - 1]) >= 0) {
- // The last cell of this result is smaller than the last cell of loadcache, skip all.
- return null;
- }
-
- // The first one must not in filtered result, we start at the second.
- int index = 1;
- while (index < result.rawCells().length) {
- if (compare(this.lastCellLoadedToCache, result.rawCells()[index]) < 0) {
- break;
- }
- index++;
- }
- Cell[] list = Arrays.copyOfRange(result.rawCells(), index, result.rawCells().length);
- return Result.create(list, result.getExists(), result.isStale(), result.mayHaveMoreCellsInRow());
- }
-
protected void initCache() {
initSyncCache();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2b75836..3e7cd00 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -316,6 +316,10 @@ public final class ConnectionUtils {
}
static Result filterCells(Result result, Cell keepCellsAfter) {
+ if (keepCellsAfter == null) {
+ // do not need to filter
+ return result;
+ }
// not the same row
if (!CellUtil.matchingRow(keepCellsAfter, result.getRow(), 0, result.getRow().length)) {
return result;
@@ -410,4 +414,14 @@ public final class ConnectionUtils {
public static int numberOfIndividualRows(List<Result> results) {
return (int) results.stream().filter(r -> !r.mayHaveMoreCellsInRow()).count();
}
+
+ public static ScanResultCache createScanResultCache(Scan scan) {
+ if (scan.getAllowPartialResults()) {
+ return new AllowPartialScanResultCache();
+ } else if (scan.getBatch() > 0) {
+ return new BatchScanResultCache(scan.getBatch());
+ } else {
+ return new CompleteScanResultCache();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
index 4752d70..f8682ec 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Result.java
@@ -24,7 +24,9 @@ import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -145,11 +147,11 @@ public class Result implements CellScannable, CellScanner {
}
public static Result create(List<Cell> cells, Boolean exists, boolean stale,
- boolean hasMoreCellsInRow) {
+ boolean mayHaveMoreCellsInRow) {
if (exists != null){
- return new Result(null, exists, stale, hasMoreCellsInRow);
+ return new Result(null, exists, stale, mayHaveMoreCellsInRow);
}
- return new Result(cells.toArray(new Cell[cells.size()]), null, stale, hasMoreCellsInRow);
+ return new Result(cells.toArray(new Cell[cells.size()]), null, stale, mayHaveMoreCellsInRow);
}
/**
@@ -792,44 +794,42 @@ public class Result implements CellScannable, CellScanner {
* @throws IOException A complete result cannot be formed because the results in the partial list
* come from different rows
*/
- public static Result createCompleteResult(List<Result> partialResults)
+ public static Result createCompleteResult(Iterable<Result> partialResults)
throws IOException {
+ if (partialResults == null) {
+ return Result.create(Collections.emptyList(), null, false);
+ }
List<Cell> cells = new ArrayList<>();
boolean stale = false;
byte[] prevRow = null;
byte[] currentRow = null;
-
- if (partialResults != null && !partialResults.isEmpty()) {
- for (int i = 0; i < partialResults.size(); i++) {
- Result r = partialResults.get(i);
- currentRow = r.getRow();
- if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
- throw new IOException(
- "Cannot form complete result. Rows of partial results do not match." +
- " Partial Results: " + partialResults);
- }
-
- // Ensure that all Results except the last one are marked as partials. The last result
- // may not be marked as a partial because Results are only marked as partials when
- // the scan on the server side must be stopped due to reaching the maxResultSize.
- // Visualizing it makes it easier to understand:
- // maxResultSize: 2 cells
- // (-x-) represents cell number x in a row
- // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
- // How row1 will be returned by the server as partial Results:
- // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
- // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
- // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
- if (i != (partialResults.size() - 1) && !r.mayHaveMoreCellsInRow()) {
- throw new IOException(
- "Cannot form complete result. Result is missing partial flag. " +
- "Partial Results: " + partialResults);
- }
- prevRow = currentRow;
- stale = stale || r.isStale();
- for (Cell c : r.rawCells()) {
- cells.add(c);
- }
+ for (Iterator<Result> iter = partialResults.iterator(); iter.hasNext();) {
+ Result r = iter.next();
+ currentRow = r.getRow();
+ if (prevRow != null && !Bytes.equals(prevRow, currentRow)) {
+ throw new IOException(
+ "Cannot form complete result. Rows of partial results do not match." +
+ " Partial Results: " + partialResults);
+ }
+ // Ensure that all Results except the last one are marked as partials. The last result
+ // may not be marked as a partial because Results are only marked as partials when
+ // the scan on the server side must be stopped due to reaching the maxResultSize.
+ // Visualizing it makes it easier to understand:
+ // maxResultSize: 2 cells
+ // (-x-) represents cell number x in a row
+ // Example: row1: -1- -2- -3- -4- -5- (5 cells total)
+ // How row1 will be returned by the server as partial Results:
+ // Result1: -1- -2- (2 cells, size limit reached, mark as partial)
+ // Result2: -3- -4- (2 cells, size limit reached, mark as partial)
+ // Result3: -5- (1 cell, size limit NOT reached, NOT marked as partial)
+ if (iter.hasNext() && !r.mayHaveMoreCellsInRow()) {
+ throw new IOException("Cannot form complete result. Result is missing partial flag. " +
+ "Partial Results: " + partialResults);
+ }
+ prevRow = currentRow;
+ stale = stale || r.isStale();
+ for (Cell c : r.rawCells()) {
+ cells.add(c);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
index fc5ba14..3fe43a5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAllowPartialScanResultCache.java
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.*;
+import static org.apache.hadoop.hbase.client.TestBatchScanResultCache.createCells;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
-import java.util.stream.IntStream;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -51,10 +51,6 @@ public class TestAllowPartialScanResultCache {
resultCache = null;
}
- private static Cell createCell(int key, int cq) {
- return new KeyValue(Bytes.toBytes(key), CF, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
- }
-
@Test
public void test() throws IOException {
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
@@ -62,31 +58,34 @@ public class TestAllowPartialScanResultCache {
assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
- Cell[] cells1 = IntStream.range(0, 10).mapToObj(i -> createCell(1, i)).toArray(Cell[]::new);
- Cell[] cells2 = IntStream.range(0, 10).mapToObj(i -> createCell(2, i)).toArray(Cell[]::new);
+ Cell[] cells1 = createCells(CF, 1, 10);
+ Cell[] cells2 = createCells(CF, 2, 10);
Result[] results1 = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOf(cells1, 5), null, false, true) }, false);
assertEquals(1, results1.length);
assertEquals(1, Bytes.toInt(results1[0].getRow()));
assertEquals(5, results1[0].rawCells().length);
- IntStream.range(0, 5).forEach(
- i -> assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+ for (int i = 0; i < 5; i++) {
+ assertEquals(1, Bytes.toInt(results1[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
Result[] results2 = resultCache.addAndGet(
new Result[] { Result.create(Arrays.copyOfRange(cells1, 1, 10), null, false, true) }, false);
assertEquals(1, results2.length);
assertEquals(1, Bytes.toInt(results2[0].getRow()));
assertEquals(5, results2[0].rawCells().length);
- IntStream.range(5, 10).forEach(
- i -> assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+ for (int i = 5; i < 10; i++) {
+ assertEquals(1, Bytes.toInt(results2[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
- Result[] results3 = resultCache
- .addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
+ Result[] results3 =
+ resultCache.addAndGet(new Result[] { Result.create(cells1), Result.create(cells2) }, false);
assertEquals(1, results3.length);
assertEquals(2, Bytes.toInt(results3[0].getRow()));
assertEquals(10, results3[0].rawCells().length);
- IntStream.range(0, 10).forEach(
- i -> assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i)))));
+ for (int i = 0; i < 10; i++) {
+ assertEquals(2, Bytes.toInt(results3[0].getValue(CF, Bytes.toBytes("cq" + i))));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
new file mode 100644
index 0000000..31a4594
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBatchScanResultCache.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertSame;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestBatchScanResultCache {
+
+ private static byte[] CF = Bytes.toBytes("cf");
+
+ private BatchScanResultCache resultCache;
+
+ @Before
+ public void setUp() {
+ resultCache = new BatchScanResultCache(4);
+ }
+
+ @After
+ public void tearDown() {
+ resultCache.clear();
+ resultCache = null;
+ }
+
+ static Cell createCell(byte[] cf, int key, int cq) {
+ return new KeyValue(Bytes.toBytes(key), cf, Bytes.toBytes("cq" + cq), Bytes.toBytes(key));
+ }
+
+ static Cell[] createCells(byte[] cf, int key, int numCqs) {
+ Cell[] cells = new Cell[numCqs];
+ for (int i = 0; i < numCqs; i++) {
+ cells[i] = createCell(cf, key, i);
+ }
+ return cells;
+ }
+
+ private void assertResultEquals(Result result, int key, int start, int to) {
+ assertEquals(to - start, result.size());
+ for (int i = start; i < to; i++) {
+ assertEquals(key, Bytes.toInt(result.getValue(CF, Bytes.toBytes("cq" + i))));
+ }
+ assertEquals(to - start == 4, result.mayHaveMoreCellsInRow());
+ }
+
+ @Test
+ public void test() throws IOException {
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false));
+ assertSame(ScanResultCache.EMPTY_RESULT_ARRAY,
+ resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
+
+ Cell[] cells1 = createCells(CF, 1, 10);
+ Cell[] cells2 = createCells(CF, 2, 10);
+ Cell[] cells3 = createCells(CF, 3, 10);
+ assertEquals(0, resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOf(cells1, 3), null, false, true) }, false).length);
+ Result[] results = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOfRange(cells1, 3, 7), null, false, true),
+ Result.create(Arrays.copyOfRange(cells1, 7, 10), null, false, true) },
+ false);
+ assertEquals(2, results.length);
+ assertResultEquals(results[0], 1, 0, 4);
+ assertResultEquals(results[1], 1, 4, 8);
+ results = resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, false);
+ assertEquals(1, results.length);
+ assertResultEquals(results[0], 1, 8, 10);
+
+ results = resultCache.addAndGet(
+ new Result[] { Result.create(Arrays.copyOfRange(cells2, 0, 4), null, false, true),
+ Result.create(Arrays.copyOfRange(cells2, 4, 8), null, false, true),
+ Result.create(Arrays.copyOfRange(cells2, 8, 10), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 0, 4), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 4, 8), null, false, true),
+ Result.create(Arrays.copyOfRange(cells3, 8, 10), null, false, false) },
+ false);
+ assertEquals(6, results.length);
+ assertResultEquals(results[0], 2, 0, 4);
+ assertResultEquals(results[1], 2, 4, 8);
+ assertResultEquals(results[2], 2, 8, 10);
+ assertResultEquals(results[3], 3, 0, 4);
+ assertResultEquals(results[4], 3, 4, 8);
+ assertResultEquals(results[5], 3, 8, 10);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
index a340e9f..8759593 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCompleteResultScanResultCache.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertSame;
import java.io.IOException;
import java.util.Arrays;
-import java.util.stream.IntStream;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
@@ -70,9 +69,9 @@ public class TestCompleteResultScanResultCache {
resultCache.addAndGet(ScanResultCache.EMPTY_RESULT_ARRAY, true));
int count = 10;
Result[] results = new Result[count];
- IntStream.range(0, count).forEach(i -> {
+ for (int i = 0; i < count; i++) {
results[i] = Result.create(Arrays.asList(createCell(i, CQ1)));
- });
+ }
assertSame(results, resultCache.addAndGet(results, false));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1849e8a5/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
new file mode 100644
index 0000000..2a32206
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTablePartialScan.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestRawAsyncTablePartialScan {
+
+ private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+ private static TableName TABLE_NAME = TableName.valueOf("async");
+
+ private static byte[] FAMILY = Bytes.toBytes("cf");
+
+ private static byte[][] CQS =
+ new byte[][] { Bytes.toBytes("cq1"), Bytes.toBytes("cq2"), Bytes.toBytes("cq3") };
+
+ private static int COUNT = 100;
+
+ private static AsyncConnection CONN;
+
+ private static RawAsyncTable TABLE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ TABLE = CONN.getRawTable(TABLE_NAME);
+ TABLE
+ .putAll(IntStream.range(0, COUNT)
+ .mapToObj(i -> new Put(Bytes.toBytes(String.format("%02d", i)))
+ .addColumn(FAMILY, CQS[0], Bytes.toBytes(i))
+ .addColumn(FAMILY, CQS[1], Bytes.toBytes(2 * i))
+ .addColumn(FAMILY, CQS[2], Bytes.toBytes(3 * i)))
+ .collect(Collectors.toList()))
+ .get();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CONN.close();
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testBatchDoNotAllowPartial() throws InterruptedException, ExecutionException {
+ // we set batch to 2 and max result size to 1, then server will only returns one result per call
+ // but we should get 2 + 1 for every row.
+ List<Result> results = TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1)).get();
+ assertEquals(2 * COUNT, results.size());
+ for (int i = 0; i < COUNT; i++) {
+ Result firstTwo = results.get(2 * i);
+ assertEquals(String.format("%02d", i), Bytes.toString(firstTwo.getRow()));
+ assertEquals(2, firstTwo.size());
+ assertEquals(i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+ assertEquals(2 * i, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+ Result secondOne = results.get(2 * i + 1);
+ assertEquals(String.format("%02d", i), Bytes.toString(secondOne.getRow()));
+ assertEquals(1, secondOne.size());
+ assertEquals(3 * i, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+ }
+ }
+
+ @Test
+ public void testReversedBatchDoNotAllowPartial() throws InterruptedException, ExecutionException {
+ // we set batch to 2 and max result size to 1, then server will only returns one result per call
+ // but we should get 2 + 1 for every row.
+ List<Result> results =
+ TABLE.scanAll(new Scan().setBatch(2).setMaxResultSize(1).setReversed(true)).get();
+ assertEquals(2 * COUNT, results.size());
+ for (int i = 0; i < COUNT; i++) {
+ int row = COUNT - i - 1;
+ Result firstTwo = results.get(2 * i);
+ assertEquals(String.format("%02d", row), Bytes.toString(firstTwo.getRow()));
+ assertEquals(2, firstTwo.size());
+ assertEquals(row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[0])));
+ assertEquals(2 * row, Bytes.toInt(firstTwo.getValue(FAMILY, CQS[1])));
+
+ Result secondOne = results.get(2 * i + 1);
+ assertEquals(String.format("%02d", row), Bytes.toString(secondOne.getRow()));
+ assertEquals(1, secondOne.size());
+ assertEquals(3 * row, Bytes.toInt(secondOne.getValue(FAMILY, CQS[2])));
+ }
+ }
+}
[05/14] hbase git commit: HBASE-17747 Support both weak and soft
object pool
Posted by sy...@apache.org.
HBASE-17747 Support both weak and soft object pool
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/44b25588
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/44b25588
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/44b25588
Branch: refs/heads/hbase-12439
Commit: 44b255889cfb168aaac8adc162f740beb61a7221
Parents: 201c838
Author: Yu Li <li...@apache.org>
Authored: Tue Mar 14 11:07:52 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Tue Mar 14 11:07:52 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/util/KeyLocker.java | 2 +-
.../apache/hadoop/hbase/util/ObjectPool.java | 174 +++++++++++++++++++
.../hadoop/hbase/util/SoftObjectPool.java | 81 +++++++++
.../hadoop/hbase/util/WeakObjectPool.java | 151 ++--------------
.../hadoop/hbase/util/TestWeakObjectPool.java | 4 +-
.../hadoop/hbase/util/IdReadWriteLock.java | 9 +-
.../hadoop/hbase/util/TestIdReadWriteLock.java | 5 +-
7 files changed, 285 insertions(+), 141 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
index 6acf584..57e7bb0 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
@@ -50,7 +50,7 @@ public class KeyLocker<K> {
private final WeakObjectPool<K, ReentrantLock> lockPool =
new WeakObjectPool<>(
- new WeakObjectPool.ObjectFactory<K, ReentrantLock>() {
+ new ObjectPool.ObjectFactory<K, ReentrantLock>() {
@Override
public ReentrantLock createObject(K key) {
return new ReentrantLock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
new file mode 100644
index 0000000..f736922
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ObjectPool.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * A thread-safe shared object pool in which object creation is expected to be lightweight, and the
+ * objects may be excessively created and discarded.
+ */
+@InterfaceAudience.Private
+public abstract class ObjectPool<K, V> {
+ /**
+ * An {@code ObjectFactory} object is used to create
+ * new shared objects on demand.
+ */
+ public interface ObjectFactory<K, V> {
+ /**
+ * Creates a new shared object associated with the given {@code key},
+ * identified by the {@code equals} method.
+ * This method may be simultaneously called by multiple threads
+ * with the same key, and the excessive objects are just discarded.
+ */
+ V createObject(K key);
+ }
+
+ protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
+
+ private final ObjectFactory<K, V> objectFactory;
+
+ /** Does not permit null keys. */
+ protected final ConcurrentMap<K, Reference<V>> referenceCache;
+
+ /**
+ * The default initial capacity,
+ * used when not otherwise specified in a constructor.
+ */
+ public static final int DEFAULT_INITIAL_CAPACITY = 16;
+
+ /**
+ * The default concurrency level,
+ * used when not otherwise specified in a constructor.
+ */
+ public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+
+ /**
+ * Creates a new pool with the default initial capacity (16)
+ * and the default concurrency level (16).
+ *
+ * @param objectFactory the factory to supply new objects on demand
+ *
+ * @throws NullPointerException if {@code objectFactory} is null
+ */
+ public ObjectPool(ObjectFactory<K, V> objectFactory) {
+ this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
+ }
+
+ /**
+ * Creates a new pool with the given initial capacity
+ * and the default concurrency level (16).
+ *
+ * @param objectFactory the factory to supply new objects on demand
+ * @param initialCapacity the initial capacity to keep objects in the pool
+ *
+ * @throws NullPointerException if {@code objectFactory} is null
+ * @throws IllegalArgumentException if {@code initialCapacity} is negative
+ */
+ public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
+ this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
+ }
+
+ /**
+ * Creates a new pool with the given initial capacity
+ * and the given concurrency level.
+ *
+ * @param objectFactory the factory to supply new objects on demand
+ * @param initialCapacity the initial capacity to keep objects in the pool
+ * @param concurrencyLevel the estimated count of concurrently accessing threads
+ *
+ * @throws NullPointerException if {@code objectFactory} is null
+ * @throws IllegalArgumentException if {@code initialCapacity} is negative or
+ * {@code concurrencyLevel} is non-positive
+ */
+ public ObjectPool(
+ ObjectFactory<K, V> objectFactory,
+ int initialCapacity,
+ int concurrencyLevel) {
+
+ if (objectFactory == null) {
+ throw new NullPointerException("Given object factory instance is NULL");
+ }
+ this.objectFactory = objectFactory;
+
+ this.referenceCache =
+ new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
+ }
+
+ /**
+ * Removes stale references of shared objects from the pool.
+ * References newly becoming stale may still remain.
+ * The implementation of this method is expected to be lightweight
+ * when there is no stale reference.
+ */
+ public abstract void purge();
+
+ /**
+ * Create a reference associated with the given object
+ * @param key the key to store in the reference
+ * @param obj the object to associate with
+ * @return the reference instance
+ */
+ public abstract Reference<V> createReference(K key, V obj);
+
+ /**
+ * Returns a shared object associated with the given {@code key},
+ * which is identified by the {@code equals} method.
+ * @throws NullPointerException if {@code key} is null
+ */
+ public V get(K key) {
+ Reference<V> ref = referenceCache.get(key);
+ if (ref != null) {
+ V obj = ref.get();
+ if (obj != null) {
+ return obj;
+ }
+ referenceCache.remove(key, ref);
+ }
+
+ V newObj = objectFactory.createObject(key);
+ Reference<V> newRef = createReference(key, newObj);
+ while (true) {
+ Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
+ if (existingRef == null) {
+ return newObj;
+ }
+
+ V existingObject = existingRef.get();
+ if (existingObject != null) {
+ return existingObject;
+ }
+ referenceCache.remove(key, existingRef);
+ }
+ }
+
+ /**
+ * Returns an estimated count of objects kept in the pool.
+ * This also counts stale references,
+ * and you might want to call {@link #purge()} beforehand.
+ */
+ public int size() {
+ return referenceCache.size();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
new file mode 100644
index 0000000..7f27f98
--- /dev/null
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/SoftObjectPool.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import java.lang.ref.Reference;
+import java.lang.ref.SoftReference;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
+
+/**
+ * A {@code SoftReference} based shared object pool.
+ * The objects are kept in soft references and
+ * associated with keys which are identified by the {@code equals} method.
+ * The objects are created by {@link ObjectFactory} on demand.
+ * The object creation is expected to be lightweight,
+ * and the objects may be excessively created and discarded.
+ * Thread safe.
+ */
+@InterfaceAudience.Private
+public class SoftObjectPool<K, V> extends ObjectPool<K, V> {
+
+ public SoftObjectPool(ObjectFactory<K, V> objectFactory) {
+ super(objectFactory);
+ }
+
+ public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
+ super(objectFactory, initialCapacity);
+ }
+
+ public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
+ int concurrencyLevel) {
+ super(objectFactory, initialCapacity, concurrencyLevel);
+ }
+
+ @Override
+ public void purge() {
+ // This method is lightweight while there is no stale reference
+ // with the Oracle (Sun) implementation of {@code ReferenceQueue},
+ // because {@code ReferenceQueue.poll} just checks a volatile instance
+ // variable in {@code ReferenceQueue}.
+ while (true) {
+ @SuppressWarnings("unchecked")
+ SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll();
+ if (ref == null) {
+ break;
+ }
+ referenceCache.remove(ref.key, ref);
+ }
+ }
+
+ @Override
+ public Reference<V> createReference(K key, V obj) {
+ return new SoftObjectReference(key, obj);
+ }
+
+ private class SoftObjectReference extends SoftReference<V> {
+ final K key;
+
+ SoftObjectReference(K key, V obj) {
+ super(obj, staleRefQueue);
+ this.key = key;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
index 478864b..8529f01 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/WeakObjectPool.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,15 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.util;
-import java.lang.ref.ReferenceQueue;
+import java.lang.ref.Reference;
import java.lang.ref.WeakReference;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
/**
* A {@code WeakReference} based shared object pool.
@@ -35,116 +33,30 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
* Thread safe.
*/
@InterfaceAudience.Private
-public class WeakObjectPool<K, V> {
- /**
- * An {@code ObjectFactory} object is used to create
- * new shared objects on demand.
- */
- public interface ObjectFactory<K, V> {
- /**
- * Creates a new shared object associated with the given {@code key},
- * identified by the {@code equals} method.
- * This method may be simultaneously called by multiple threads
- * with the same key, and the excessive objects are just discarded.
- */
- V createObject(K key);
- }
-
- private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
-
- private class ObjectReference extends WeakReference<V> {
- final K key;
-
- ObjectReference(K key, V obj) {
- super(obj, staleRefQueue);
- this.key = key;
- }
- }
-
- private final ObjectFactory<K, V> objectFactory;
-
- /** Does not permit null keys. */
- private final ConcurrentMap<K, ObjectReference> referenceCache;
-
- /**
- * The default initial capacity,
- * used when not otherwise specified in a constructor.
- */
- public static final int DEFAULT_INITIAL_CAPACITY = 16;
-
- /**
- * The default concurrency level,
- * used when not otherwise specified in a constructor.
- */
- public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
+public class WeakObjectPool<K,V> extends ObjectPool<K,V> {
- /**
- * Creates a new pool with the default initial capacity (16)
- * and the default concurrency level (16).
- *
- * @param objectFactory the factory to supply new objects on demand
- *
- * @throws NullPointerException if {@code objectFactory} is null
- */
public WeakObjectPool(ObjectFactory<K, V> objectFactory) {
- this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
+ super(objectFactory);
}
- /**
- * Creates a new pool with the given initial capacity
- * and the default concurrency level (16).
- *
- * @param objectFactory the factory to supply new objects on demand
- * @param initialCapacity the initial capacity to keep objects in the pool
- *
- * @throws NullPointerException if {@code objectFactory} is null
- * @throws IllegalArgumentException if {@code initialCapacity} is negative
- */
public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
- this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
+ super(objectFactory, initialCapacity);
}
- /**
- * Creates a new pool with the given initial capacity
- * and the given concurrency level.
- *
- * @param objectFactory the factory to supply new objects on demand
- * @param initialCapacity the initial capacity to keep objects in the pool
- * @param concurrencyLevel the estimated count of concurrently accessing threads
- *
- * @throws NullPointerException if {@code objectFactory} is null
- * @throws IllegalArgumentException if {@code initialCapacity} is negative or
- * {@code concurrencyLevel} is non-positive
- */
- public WeakObjectPool(
- ObjectFactory<K, V> objectFactory,
- int initialCapacity,
+ public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
int concurrencyLevel) {
-
- if (objectFactory == null) {
- throw new NullPointerException();
- }
- this.objectFactory = objectFactory;
-
- this.referenceCache = new ConcurrentHashMap<>(initialCapacity, 0.75f, concurrencyLevel);
- // 0.75f is the default load factor threshold of ConcurrentHashMap.
+ super(objectFactory, initialCapacity, concurrencyLevel);
}
- /**
- * Removes stale references of shared objects from the pool.
- * References newly becoming stale may still remain.
- * The implementation of this method is expected to be lightweight
- * when there is no stale reference.
- */
+ @Override
public void purge() {
// This method is lightweight while there is no stale reference
// with the Oracle (Sun) implementation of {@code ReferenceQueue},
// because {@code ReferenceQueue.poll} just checks a volatile instance
// variable in {@code ReferenceQueue}.
-
while (true) {
@SuppressWarnings("unchecked")
- ObjectReference ref = (ObjectReference)staleRefQueue.poll();
+ WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll();
if (ref == null) {
break;
}
@@ -152,43 +64,18 @@ public class WeakObjectPool<K, V> {
}
}
- /**
- * Returns a shared object associated with the given {@code key},
- * which is identified by the {@code equals} method.
- * @throws NullPointerException if {@code key} is null
- */
- public V get(K key) {
- ObjectReference ref = referenceCache.get(key);
- if (ref != null) {
- V obj = ref.get();
- if (obj != null) {
- return obj;
- }
- referenceCache.remove(key, ref);
- }
+ @Override
+ public Reference<V> createReference(K key, V obj) {
+ return new WeakObjectReference(key, obj);
+ }
- V newObj = objectFactory.createObject(key);
- ObjectReference newRef = new ObjectReference(key, newObj);
- while (true) {
- ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef);
- if (existingRef == null) {
- return newObj;
- }
+ private class WeakObjectReference extends WeakReference<V> {
+ final K key;
- V existingObject = existingRef.get();
- if (existingObject != null) {
- return existingObject;
- }
- referenceCache.remove(key, existingRef);
+ WeakObjectReference(K key, V obj) {
+ super(obj, staleRefQueue);
+ this.key = key;
}
}
- /**
- * Returns an estimated count of objects kept in the pool.
- * This also counts stale references,
- * and you might want to call {@link #purge()} beforehand.
- */
- public int size() {
- return referenceCache.size();
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
index d9fefa2..9dbbbd0 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestWeakObjectPool.java
@@ -31,12 +31,12 @@ import org.junit.experimental.categories.Category;
@Category({MiscTests.class, SmallTests.class})
public class TestWeakObjectPool {
- WeakObjectPool<String, Object> pool;
+ ObjectPool<String, Object> pool;
@Before
public void setUp() {
pool = new WeakObjectPool<>(
- new WeakObjectPool.ObjectFactory<String, Object>() {
+ new ObjectPool.ObjectFactory<String, Object>() {
@Override
public Object createObject(String key) {
return new Object();
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
index caf3265..deb2265 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -44,10 +44,11 @@ import com.google.common.annotations.VisibleForTesting;
public class IdReadWriteLock {
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
- // The pool to get entry from, entries are mapped by weak reference to make it able to be
- // garbage-collected asap
- private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool = new WeakObjectPool<>(
- new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+ // The pool to get entry from, entries are mapped by soft reference and will be
+ // automatically garbage-collected when JVM memory pressure is high
+ private final ObjectPool<Long, ReentrantReadWriteLock> lockPool =
+ new SoftObjectPool<>(
+ new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
@Override
public ReentrantReadWriteLock createObject(Long id) {
return new ReentrantReadWriteLock();
http://git-wip-us.apache.org/repos/asf/hbase/blob/44b25588/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
index 2ccfad8..295816f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -111,10 +111,11 @@ public class TestIdReadWriteLock {
Future<Boolean> result = ecs.take();
assertTrue(result.get());
}
- // make sure the entry pool will be cleared after GC and purge call
+ // make sure the entry pool won't be cleared when JVM memory is enough
+ // even after GC and purge call
int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
- assertEquals(0, entryPoolSize);
+ assertEquals(NUM_IDS, entryPoolSize);
} finally {
exec.shutdown();
exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
[08/14] hbase git commit: HBASE-17782 Extend IdReadWriteLock to
support using both weak and soft reference
Posted by sy...@apache.org.
HBASE-17782 Extend IdReadWriteLock to support using both weak and soft reference
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/aace02a2
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/aace02a2
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/aace02a2
Branch: refs/heads/hbase-12439
Commit: aace02a230a61cc7e91eb240598435c36c9af403
Parents: 14fb57c
Author: Yu Li <li...@apache.org>
Authored: Wed Mar 15 11:07:42 2017 +0800
Committer: Yu Li <li...@apache.org>
Committed: Wed Mar 15 11:07:42 2017 +0800
----------------------------------------------------------------------
.../hbase/io/hfile/bucket/BucketCache.java | 5 +-
.../hadoop/hbase/util/IdReadWriteLock.java | 58 ++++++++++++++++----
.../hbase/wal/RegionGroupingProvider.java | 13 +++--
.../hadoop/hbase/util/TestIdReadWriteLock.java | 31 +++++++++--
4 files changed, 86 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
index cb23ca9..3e9c376 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/bucket/BucketCache.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hbase.nio.ByteBuff;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.HasThread;
import org.apache.hadoop.hbase.util.IdReadWriteLock;
+import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -185,9 +186,11 @@ public class BucketCache implements BlockCache, HeapSize {
/**
* A ReentrantReadWriteLock to lock on a particular block identified by offset.
* The purpose of this is to avoid freeing the block which is being read.
+ * <p>
+ * Key set of offsets in BucketCache is limited so soft reference is the best choice here.
*/
@VisibleForTesting
- final IdReadWriteLock offsetLock = new IdReadWriteLock();
+ final IdReadWriteLock offsetLock = new IdReadWriteLock(ReferenceType.SOFT);
private final NavigableSet<BlockCacheKey> blocksByHFile =
new ConcurrentSkipListSet<>(new Comparator<BlockCacheKey>() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
index deb2265..2a83029 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/IdReadWriteLock.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.util;
+import java.lang.ref.Reference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -44,16 +45,48 @@ import com.google.common.annotations.VisibleForTesting;
public class IdReadWriteLock {
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
- // The pool to get entry from, entries are mapped by soft reference and will be
- // automatically garbage-collected when JVM memory pressure is high
- private final ObjectPool<Long, ReentrantReadWriteLock> lockPool =
- new SoftObjectPool<>(
- new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
- @Override
- public ReentrantReadWriteLock createObject(Long id) {
- return new ReentrantReadWriteLock();
- }
- }, NB_CONCURRENT_LOCKS);
+ /**
+ * The pool to get entry from, entries are mapped by {@link Reference} and will be automatically
+ * garbage-collected by JVM
+ */
+ private final ObjectPool<Long, ReentrantReadWriteLock> lockPool;
+ private final ReferenceType refType;
+
+ public IdReadWriteLock() {
+ this(ReferenceType.WEAK);
+ }
+
+ /**
+ * Constructor of IdReadWriteLock
+ * @param referenceType type of the reference used in lock pool, {@link ReferenceType#WEAK} by
+ * default. Use {@link ReferenceType#SOFT} if the key set is limited and the locks will
+ * be reused with a high frequency
+ */
+ public IdReadWriteLock(ReferenceType referenceType) {
+ this.refType = referenceType;
+ switch (referenceType) {
+ case SOFT:
+ lockPool = new SoftObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+ @Override
+ public ReentrantReadWriteLock createObject(Long id) {
+ return new ReentrantReadWriteLock();
+ }
+ }, NB_CONCURRENT_LOCKS);
+ break;
+ case WEAK:
+ default:
+ lockPool = new WeakObjectPool<>(new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
+ @Override
+ public ReentrantReadWriteLock createObject(Long id) {
+ return new ReentrantReadWriteLock();
+ }
+ }, NB_CONCURRENT_LOCKS);
+ }
+ }
+
+ public static enum ReferenceType {
+ WEAK, SOFT
+ }
/**
* Get the ReentrantReadWriteLock corresponding to the given id
@@ -93,4 +126,9 @@ public class IdReadWriteLock {
Thread.sleep(50);
}
}
+
+ @VisibleForTesting
+ public ReferenceType getReferenceType() {
+ return this.refType;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
index dee36e8..5a29731 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java
@@ -27,7 +27,6 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.locks.Lock;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -36,7 +35,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
// imports for classes still in regionserver.wal
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.IdReadWriteLock;
+import org.apache.hadoop.hbase.util.IdLock;
/**
* A WAL Provider that returns a WAL per group of regions.
@@ -132,7 +131,7 @@ public class RegionGroupingProvider implements WALProvider {
/** A group-provider mapping, make sure one-one rather than many-one mapping */
private final ConcurrentMap<String, WALProvider> cached = new ConcurrentHashMap<>();
- private final IdReadWriteLock createLock = new IdReadWriteLock();
+ private final IdLock createLock = new IdLock();
private RegionGroupingStrategy strategy = null;
private WALFactory factory = null;
@@ -181,16 +180,18 @@ public class RegionGroupingProvider implements WALProvider {
private WAL getWAL(final String group) throws IOException {
WALProvider provider = cached.get(group);
if (provider == null) {
- Lock lock = createLock.getLock(group.hashCode()).writeLock();
- lock.lock();
+ IdLock.Entry lockEntry = null;
try {
+ lockEntry = createLock.getLockEntry(group.hashCode());
provider = cached.get(group);
if (provider == null) {
provider = createProvider(group);
cached.put(group, provider);
}
} finally {
- lock.unlock();
+ if (lockEntry != null) {
+ createLock.releaseLockEntry(lockEntry);
+ }
}
}
return provider.getWAL(null, null);
http://git-wip-us.apache.org/repos/asf/hbase/blob/aace02a2/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
index 295816f..7dd2a63 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestIdReadWriteLock.java
@@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
+import java.util.Arrays;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Callable;
@@ -38,9 +39,13 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
+import org.apache.hadoop.hbase.util.IdReadWriteLock.ReferenceType;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+@RunWith(Parameterized.class)
@Category({MiscTests.class, MediumTests.class})
// Medium as it creates 100 threads; seems better to run it isolated
public class TestIdReadWriteLock {
@@ -51,7 +56,14 @@ public class TestIdReadWriteLock {
private static final int NUM_THREADS = 128;
private static final int NUM_SECONDS = 15;
- private IdReadWriteLock idLock = new IdReadWriteLock();
+ @Parameterized.Parameter
+ public IdReadWriteLock idLock;
+
+ @Parameterized.Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] { { new IdReadWriteLock(ReferenceType.WEAK) },
+ { new IdReadWriteLock(ReferenceType.SOFT) } });
+ }
private Map<Long, String> idOwner = new ConcurrentHashMap<>();
@@ -111,11 +123,22 @@ public class TestIdReadWriteLock {
Future<Boolean> result = ecs.take();
assertTrue(result.get());
}
- // make sure the entry pool won't be cleared when JVM memory is enough
- // even after GC and purge call
int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
- assertEquals(NUM_IDS, entryPoolSize);
+ ReferenceType refType = idLock.getReferenceType();
+ switch (refType) {
+ case WEAK:
+ // make sure the entry pool will be cleared after GC and purge call
+ assertEquals(0, entryPoolSize);
+ break;
+ case SOFT:
+ // make sure the entry pool won't be cleared when JVM memory is enough
+ // even after GC and purge call
+ assertEquals(NUM_IDS, entryPoolSize);
+ break;
+ default:
+ break;
+ }
} finally {
exec.shutdown();
exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
[13/14] hbase git commit: HBASE-17790 Mark ReplicationAdmin's
peerAdded and listReplicationPeers as Deprecated
Posted by sy...@apache.org.
HBASE-17790 Mark ReplicationAdmin's peerAdded and listReplicationPeers as Deprecated
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6a6fff10
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6a6fff10
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6a6fff10
Branch: refs/heads/hbase-12439
Commit: 6a6fff103e0fcadfd539fbbae5157a99643a033b
Parents: 53e9a1c
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Mar 16 16:55:18 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Thu Mar 16 16:55:18 2017 +0800
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 5 ++++
.../replication/TestReplicationAdmin.java | 25 ++++++++++----------
.../TestReplicationAdminWithClusters.java | 3 +--
.../replication/TestNamespaceReplication.java | 1 -
.../hbase/replication/TestReplicationBase.java | 4 +++-
5 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index c7f040e..0eae10b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -521,11 +521,16 @@ public class ReplicationAdmin implements Closeable {
}
@VisibleForTesting
+ @Deprecated
public void peerAdded(String id) throws ReplicationException {
this.replicationPeers.peerConnected(id);
}
+ /**
+ * @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
+ */
@VisibleForTesting
+ @Deprecated
List<ReplicationPeer> listReplicationPeers() throws IOException {
Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
if (peers == null || peers.size() <= 0) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index f092a48..a23b76a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -33,11 +33,13 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -74,6 +76,7 @@ public class TestReplicationAdmin {
private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
private static ReplicationAdmin admin;
+ private static Admin hbaseAdmin;
@Rule
public TestName name = new TestName();
@@ -87,6 +90,7 @@ public class TestReplicationAdmin {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
admin = new ReplicationAdmin(conf);
+ hbaseAdmin = TEST_UTIL.getAdmin();
}
@AfterClass
@@ -149,16 +153,16 @@ public class TestReplicationAdmin {
config.setClusterKey(KEY_ONE);
config.getConfiguration().put("key1", "value1");
config.getConfiguration().put("key2", "value2");
- admin.addPeer(ID_ONE, config, null);
+ hbaseAdmin.addReplicationPeer(ID_ONE, config);
- List<ReplicationPeer> peers = admin.listReplicationPeers();
+ List<ReplicationPeerDescription> peers = hbaseAdmin.listReplicationPeers();
assertEquals(1, peers.size());
- ReplicationPeer peerOne = peers.get(0);
+ ReplicationPeerDescription peerOne = peers.get(0);
assertNotNull(peerOne);
- assertEquals("value1", peerOne.getConfiguration().get("key1"));
- assertEquals("value2", peerOne.getConfiguration().get("key2"));
+ assertEquals("value1", peerOne.getPeerConfig().getConfiguration().get("key1"));
+ assertEquals("value2", peerOne.getPeerConfig().getConfiguration().get("key2"));
- admin.removePeer(ID_ONE);
+ hbaseAdmin.removeReplicationPeer(ID_ONE);
}
@Test
@@ -403,8 +407,7 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
- admin.addPeer(ID_ONE, rpc);
- admin.peerAdded(ID_ONE);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<>();
@@ -438,8 +441,7 @@ public class TestReplicationAdmin {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
- admin.addPeer(ID_ONE, rpc);
- admin.peerAdded(ID_ONE);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
Set<String> namespaces = new HashSet<String>();
@@ -482,8 +484,7 @@ public class TestReplicationAdmin {
public void testPeerBandwidth() throws Exception {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(KEY_ONE);
- admin.addPeer(ID_ONE, rpc);
- admin.peerAdded(ID_ONE);
+ hbaseAdmin.addReplicationPeer(ID_ONE, rpc);
rpc = admin.getPeerConfig(ID_ONE);
assertEquals(0, rpc.getBandwidth());
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 312a90a..56f4141 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -229,8 +229,7 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
rpc.setReplicationEndpointImpl(TestUpdatableReplicationEndpoint.class.getName());
rpc.getConfiguration().put("key1", "value1");
- admin.addPeer(peerId, rpc);
- admin.peerAdded(peerId);
+ admin1.addReplicationPeer(peerId, rpc);
rpc.getConfiguration().put("key1", "value2");
admin.updatePeerConfig(peerId, rpc);
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
index e296f87..433a345 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestNamespaceReplication.java
@@ -140,7 +140,6 @@ public class TestNamespaceReplication extends TestReplicationBase {
Table htab1B = connection1.getTable(tabBName);
Table htab2B = connection2.getTable(tabBName);
- admin.peerAdded("2");
// add ns1 to peer config which replicate to cluster2
ReplicationPeerConfig rpc = admin.getPeerConfig("2");
Set<String> namespaces = new HashSet<>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/6a6fff10/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
index caad544..81fe629 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
@@ -63,6 +63,7 @@ public class TestReplicationBase {
protected static ZooKeeperWatcher zkw2;
protected static ReplicationAdmin admin;
+ private static Admin hbaseAdmin;
protected static Table htable1;
protected static Table htable2;
@@ -133,7 +134,8 @@ public class TestReplicationBase {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(utility2.getClusterKey());
- admin.addPeer("2", rpc, null);
+ hbaseAdmin = ConnectionFactory.createConnection(conf1).getAdmin();
+ hbaseAdmin.addReplicationPeer("2", rpc);
HTableDescriptor table = new HTableDescriptor(tableName);
HColumnDescriptor fam = new HColumnDescriptor(famName);
[04/14] hbase git commit: guard against NPE while reading FileTrailer
and HFileBlock
Posted by sy...@apache.org.
guard against NPE while reading FileTrailer and HFileBlock
guard against NPE from FSInputStream#seek
Signed-off-by: Michael Stack <st...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/201c8382
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/201c8382
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/201c8382
Branch: refs/heads/hbase-12439
Commit: 201c8382508da1266d11e04d3c7cbef42e0a256a
Parents: 35d7a0c
Author: James Moore <jc...@hubspot.com>
Authored: Fri Feb 24 10:26:12 2017 -0500
Committer: Michael Stack <st...@apache.org>
Committed: Mon Mar 13 14:53:35 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/io/hfile/FixedFileTrailer.java | 3 +-
.../hadoop/hbase/io/hfile/HFileBlock.java | 2 +-
.../apache/hadoop/hbase/io/hfile/HFileUtil.java | 43 ++++++++++++++++++++
3 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
index 7eac9c6..1854236 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/FixedFileTrailer.java
@@ -388,7 +388,8 @@ public class FixedFileTrailer {
bufferSize = (int) fileSize;
}
- istream.seek(seekPoint);
+ HFileUtil.seekOnMultipleSources(istream, seekPoint);
+
ByteBuffer buf = ByteBuffer.allocate(bufferSize);
istream.readFully(buf.array(), buf.arrayOffset(),
buf.arrayOffset() + buf.limit());
http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
index fba15ba..0b140b6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileBlock.java
@@ -1512,7 +1512,7 @@ public class HFileBlock implements Cacheable {
if (!pread && streamLock.tryLock()) {
// Seek + read. Better for scanning.
try {
- istream.seek(fileOffset);
+ HFileUtil.seekOnMultipleSources(istream, fileOffset);
long realOffset = istream.getPos();
if (realOffset != fileOffset) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/201c8382/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
new file mode 100644
index 0000000..835450c
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileUtil.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io.hfile;
+
+import java.io.IOException;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+public class HFileUtil {
+
+ /** guards against NullPointer
+ * utility which tries to seek on the DFSIS and will try an alternative source
+ * if the FSDataInputStream throws an NPE HBASE-17501
+ * @param istream
+ * @param offset
+ * @throws IOException
+ */
+ static public void seekOnMultipleSources(FSDataInputStream istream, long offset) throws IOException {
+ try {
+ // attempt to seek inside of current blockReader
+ istream.seek(offset);
+ } catch (NullPointerException e) {
+ // retry the seek on an alternate copy of the data
+ // this can occur if the blockReader on the DFSInputStream is null
+ istream.seekToNewSource(offset);
+ }
+ }
+}
[10/14] hbase git commit: HBASE-17723 ClientAsyncPrefetchScanner may
end prematurely when the size of the cache is one
Posted by sy...@apache.org.
HBASE-17723 ClientAsyncPrefetchScanner may end prematurely when the size of the cache is one
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/0ecb6782
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/0ecb6782
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/0ecb6782
Branch: refs/heads/hbase-12439
Commit: 0ecb6782593039af75a45c25481f1dbf7cbd6928
Parents: a49bc58
Author: Chia-Ping Tsai <ch...@gmail.com>
Authored: Sun Mar 12 13:48:12 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Thu Mar 16 03:07:20 2017 +0800
----------------------------------------------------------------------
.../client/ClientAsyncPrefetchScanner.java | 61 +++++++++++-------
.../client/TestScannersFromClientSide.java | 66 ++++++++++++++------
2 files changed, 88 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
index b1fc2da..007e638 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientAsyncPrefetchScanner.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import com.google.common.annotations.VisibleForTesting;
import static org.apache.hadoop.hbase.client.ConnectionUtils.calcEstimatedSize;
import java.io.IOException;
@@ -26,6 +27,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@@ -62,6 +64,8 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
private AtomicBoolean prefetchRunning;
// an attribute for synchronizing close between scanner and prefetch threads
private AtomicLong closingThreadId;
+ // used for testing
+ private Consumer<Boolean> prefetchListener;
private static final int NO_THREAD = -1;
public ClientAsyncPrefetchScanner(Configuration configuration, Scan scan, TableName name,
@@ -72,6 +76,11 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
replicaCallTimeoutMicroSecondScan);
}
+ @VisibleForTesting
+ void setPrefetchListener(Consumer<Boolean> prefetchListener) {
+ this.prefetchListener = prefetchListener;
+ }
+
@Override
protected void initCache() {
// concurrent cache
@@ -88,34 +97,39 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
public Result next() throws IOException {
try {
- handleException();
+ boolean hasExecutedPrefetch = false;
+ do {
+ handleException();
- // If the scanner is closed and there's nothing left in the cache, next is a no-op.
- if (getCacheCount() == 0 && this.closed) {
- return null;
- }
- if (prefetchCondition()) {
- // run prefetch in the background only if no prefetch is already running
- if (!isPrefetchRunning()) {
- if (prefetchRunning.compareAndSet(false, true)) {
- getPool().execute(prefetchRunnable);
+ // If the scanner is closed and there's nothing left in the cache, next is a no-op.
+ if (getCacheCount() == 0 && this.closed) {
+ return null;
+ }
+
+ if (prefetchCondition()) {
+ // run prefetch in the background only if no prefetch is already running
+ if (!isPrefetchRunning()) {
+ if (prefetchRunning.compareAndSet(false, true)) {
+ getPool().execute(prefetchRunnable);
+ hasExecutedPrefetch = true;
+ }
+ }
+ }
+
+ while (isPrefetchRunning()) {
+ // prefetch running or still pending
+ if (getCacheCount() > 0) {
+ return pollCache();
+ } else {
+ // (busy) wait for a record - sleep
+ Threads.sleep(1);
}
}
- }
- while (isPrefetchRunning()) {
- // prefetch running or still pending
if (getCacheCount() > 0) {
return pollCache();
- } else {
- // (busy) wait for a record - sleep
- Threads.sleep(1);
}
- }
-
- if (getCacheCount() > 0) {
- return pollCache();
- }
+ } while (!hasExecutedPrefetch);
// if we exhausted this scanner before calling close, write out the scan metrics
writeScanMetrics();
@@ -219,11 +233,16 @@ public class ClientAsyncPrefetchScanner extends ClientSimpleScanner {
@Override
public void run() {
+ boolean succeed = false;
try {
loadCache();
+ succeed = true;
} catch (Exception e) {
exceptionsQueue.add(e);
} finally {
+ if (prefetchListener != null) {
+ prefetchListener.accept(succeed);
+ }
prefetchRunning.set(false);
if(closed) {
if (closingThreadId.compareAndSet(NO_THREAD, Thread.currentThread().getId())) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/0ecb6782/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
index 6f40093..e5c19ac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java
@@ -22,6 +22,7 @@ import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
import java.util.stream.IntStream;
import org.apache.commons.logging.Log;
@@ -656,7 +657,9 @@ public class TestScannersFromClientSide {
testAsyncScanner(TableName.valueOf(name.getMethodName()),
2,
3,
- 10);
+ 10,
+ -1,
+ null);
}
@Test
@@ -664,11 +667,28 @@ public class TestScannersFromClientSide {
testAsyncScanner(TableName.valueOf(name.getMethodName()),
30000,
1,
- 1);
+ 1,
+ -1,
+ null);
+ }
+
+ @Test
+ public void testAsyncScannerWithoutCaching() throws Exception {
+ testAsyncScanner(TableName.valueOf(name.getMethodName()),
+ 5,
+ 1,
+ 1,
+ 1,
+ (b) -> {
+ try {
+ TimeUnit.MILLISECONDS.sleep(500);
+ } catch (InterruptedException ex) {
+ }
+ });
}
private void testAsyncScanner(TableName table, int rowNumber, int familyNumber,
- int qualifierNumber) throws Exception {
+ int qualifierNumber, int caching, Consumer<Boolean> listener) throws Exception {
assert rowNumber > 0;
assert familyNumber > 0;
assert qualifierNumber > 0;
@@ -707,23 +727,33 @@ public class TestScannersFromClientSide {
Scan scan = new Scan();
scan.setAsyncPrefetch(true);
- ResultScanner scanner = ht.getScanner(scan);
- List<Cell> kvListScan = new ArrayList<>();
- Result result;
- boolean first = true;
- while ((result = scanner.next()) != null) {
- // waiting for cache. see HBASE-17376
- if (first) {
- TimeUnit.SECONDS.sleep(1);
- first = false;
- }
- for (Cell kv : result.listCells()) {
- kvListScan.add(kv);
+ if (caching > 0) {
+ scan.setCaching(caching);
+ }
+ try (ResultScanner scanner = ht.getScanner(scan)) {
+ assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
+ ((ClientAsyncPrefetchScanner) scanner).setPrefetchListener(listener);
+ List<Cell> kvListScan = new ArrayList<>();
+ Result result;
+ boolean first = true;
+ int actualRows = 0;
+ while ((result = scanner.next()) != null) {
+ ++actualRows;
+ // waiting for cache. see HBASE-17376
+ if (first) {
+ TimeUnit.SECONDS.sleep(1);
+ first = false;
+ }
+ for (Cell kv : result.listCells()) {
+ kvListScan.add(kv);
+ }
}
+ assertEquals(rowNumber, actualRows);
+ // These cells may have different rows but it is ok. The Result#getRow
+ // isn't used in the verifyResult()
+ result = Result.create(kvListScan);
+ verifyResult(result, kvListExp, toLog, "Testing async scan");
}
- result = Result.create(kvListScan);
- assertTrue("Not instance of async scanner",scanner instanceof ClientAsyncPrefetchScanner);
- verifyResult(result, kvListExp, toLog, "Testing async scan");
TEST_UTIL.deleteTable(table);
}
[06/14] hbase git commit: HBASE-17779 disable_table_replication
returns misleading message and does not turn off replication (Janos Gub)
Posted by sy...@apache.org.
HBASE-17779 disable_table_replication returns misleading message and does not turn off replication (Janos Gub)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/777fea55
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/777fea55
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/777fea55
Branch: refs/heads/hbase-12439
Commit: 777fea552eab3262e95053b2fc757fc49dfad96d
Parents: 44b2558
Author: tedyu <yu...@gmail.com>
Authored: Tue Mar 14 12:13:34 2017 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Tue Mar 14 12:13:34 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/HBaseAdmin.java | 35 +++++++++++++++-----
.../TestReplicationAdminWithClusters.java | 17 ++++++++++
2 files changed, 44 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/777fea55/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 1f143b5..6918184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -4228,14 +4228,16 @@ public class HBaseAdmin implements Admin {
/**
* Set the table's replication switch if the table's replication switch is already not set.
* @param tableName name of the table
- * @param isRepEnabled is replication switch enable or disable
+ * @param enableRep is replication switch enable or disable
* @throws IOException if a remote or network exception occurs
*/
- private void setTableRep(final TableName tableName, boolean isRepEnabled) throws IOException {
+ private void setTableRep(final TableName tableName, boolean enableRep) throws IOException {
HTableDescriptor htd = getTableDescriptor(tableName);
- if (isTableRepEnabled(htd) ^ isRepEnabled) {
+ ReplicationState currentReplicationState = getTableReplicationState(htd);
+ if (enableRep && currentReplicationState != ReplicationState.ENABLED
+ || !enableRep && currentReplicationState != ReplicationState.DISABLED) {
for (HColumnDescriptor hcd : htd.getFamilies()) {
- hcd.setScope(isRepEnabled ? HConstants.REPLICATION_SCOPE_GLOBAL
+ hcd.setScope(enableRep ? HConstants.REPLICATION_SCOPE_GLOBAL
: HConstants.REPLICATION_SCOPE_LOCAL);
}
modifyTable(tableName, htd);
@@ -4243,17 +4245,34 @@ public class HBaseAdmin implements Admin {
}
/**
+ * This enum indicates the current state of the replication for a given table.
+ */
+ private enum ReplicationState {
+ ENABLED, // all column families enabled
+ MIXED, // some column families enabled, some disabled
+ DISABLED // all column families disabled
+ }
+
+ /**
* @param htd table descriptor details for the table to check
- * @return true if table's replication switch is enabled
+ * @return ReplicationState the current state of the table.
*/
- private boolean isTableRepEnabled(HTableDescriptor htd) {
+ private ReplicationState getTableReplicationState(HTableDescriptor htd) {
+ boolean hasEnabled = false;
+ boolean hasDisabled = false;
+
for (HColumnDescriptor hcd : htd.getFamilies()) {
if (hcd.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL
&& hcd.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
- return false;
+ hasDisabled = true;
+ } else {
+ hasEnabled = true;
}
}
- return true;
+
+ if (hasEnabled && hasDisabled) return ReplicationState.MIXED;
+ if (hasEnabled) return ReplicationState.ENABLED;
+ return ReplicationState.DISABLED;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/777fea55/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index b44ecbf..312a90a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -76,6 +76,23 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
}
@Test(timeout = 300000)
+ public void disableNotFullReplication() throws Exception {
+ HTableDescriptor table = admin2.getTableDescriptor(tableName);
+ HColumnDescriptor f = new HColumnDescriptor("notReplicatedFamily");
+ table.addFamily(f);
+ admin1.disableTable(tableName);
+ admin1.modifyTable(tableName, table);
+ admin1.enableTable(tableName);
+
+
+ admin1.disableTableReplication(tableName);
+ table = admin1.getTableDescriptor(tableName);
+ for (HColumnDescriptor fam : table.getColumnFamilies()) {
+ assertEquals(fam.getScope(), HConstants.REPLICATION_SCOPE_LOCAL);
+ }
+ }
+
+ @Test(timeout = 300000)
public void testEnableReplicationWhenSlaveClusterDoesntHaveTable() throws Exception {
admin1.disableTableReplication(tableName);
admin2.disableTable(tableName);
[07/14] hbase git commit: HBASE-17780 BoundedByteBufferPool "At
capacity" messages are not actionable
Posted by sy...@apache.org.
HBASE-17780 BoundedByteBufferPool "At capacity" messages are not actionable
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14fb57ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14fb57ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14fb57ca
Branch: refs/heads/hbase-12439
Commit: 14fb57cab2fd8c0117d59669018b09e29bd6e387
Parents: 777fea5
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue Mar 14 13:23:11 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Mar 14 13:23:11 2017 -0700
----------------------------------------------------------------------
.../java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/14fb57ca/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
index 079a277..7bce0e5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/io/BoundedByteBufferPool.java
@@ -155,8 +155,8 @@ public class BoundedByteBufferPool {
long prevState = stateRef.get();
countOfBuffers = toCountOfBuffers(prevState);
if (countOfBuffers >= maxToCache) {
- if (LOG.isWarnEnabled()) {
- LOG.warn("At capacity: " + countOfBuffers);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("At capacity: " + countOfBuffers);
}
return;
}
[12/14] hbase git commit: HBASE-17707 New More Accurate Table Skew
cost function/generator - re-enable with test fix
Posted by sy...@apache.org.
HBASE-17707 New More Accurate Table Skew cost function/generator - re-enable with test fix
This reverts commit 9214ad69af486109cc4dd31f60a82ac7ad8d3427.
Signed-off-by: tedyu <yu...@gmail.com>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/53e9a1c4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/53e9a1c4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/53e9a1c4
Branch: refs/heads/hbase-12439
Commit: 53e9a1c43a3861c59d6fc5198982973a1678b65e
Parents: 1849e8a
Author: Kahlil Oppenheimer <ka...@gmail.com>
Authored: Wed Mar 15 11:43:18 2017 -0400
Committer: tedyu <yu...@gmail.com>
Committed: Wed Mar 15 20:42:40 2017 -0700
----------------------------------------------------------------------
.../hbase/master/balancer/BaseLoadBalancer.java | 74 ++++
.../master/balancer/StochasticLoadBalancer.java | 441 ++++++++++++++++++-
.../balancer/TestStochasticLoadBalancer.java | 35 +-
.../balancer/TestStochasticLoadBalancer2.java | 4 +
4 files changed, 549 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
index 0f1b1a2..b0e088c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/BaseLoadBalancer.java
@@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action.Type;
import org.apache.hadoop.hbase.security.access.AccessControlLists;
+import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
@@ -140,6 +141,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
int[] initialRegionIndexToServerIndex; //regionIndex -> serverIndex (initial cluster state)
int[] regionIndexToTableIndex; //regionIndex -> tableIndex
int[][] numRegionsPerServerPerTable; //serverIndex -> tableIndex -> # regions
+ int[] numRegionsPerTable; // tableIndex -> number of regions that table has
int[] numMaxRegionsPerTable; //tableIndex -> max number of regions in a single RS
int[] regionIndexToPrimaryIndex; //regionIndex -> regionIndex of the primary
boolean hasRegionReplicas = false; //whether there is regions with replicas
@@ -330,6 +332,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
numTables = tables.size();
numRegionsPerServerPerTable = new int[numServers][numTables];
+ numRegionsPerTable = new int[numTables];
for (int i = 0; i < numServers; i++) {
for (int j = 0; j < numTables; j++) {
@@ -339,6 +342,7 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
for (int i=0; i < regionIndexToServerIndex.length; i++) {
if (regionIndexToServerIndex[i] >= 0) {
+ numRegionsPerTable[regionIndexToTableIndex[i]]++;
numRegionsPerServerPerTable[regionIndexToServerIndex[i]][regionIndexToTableIndex[i]]++;
}
}
@@ -470,6 +474,76 @@ public abstract class BaseLoadBalancer implements LoadBalancer {
}
}
+ /**
+ * Returns the minimum number of regions of a table T each server would store if T were
+ * perfectly distributed (i.e. round-robin-ed) across the cluster
+ */
+ public int minRegionsIfEvenlyDistributed(int table) {
+ return numRegionsPerTable[table] / numServers;
+ }
+
+ /**
+ * Returns the maximum number of regions of a table T each server would store if T were
+ * perfectly distributed (i.e. round-robin-ed) across the cluster
+ */
+ public int maxRegionsIfEvenlyDistributed(int table) {
+ int min = minRegionsIfEvenlyDistributed(table);
+ return numRegionsPerTable[table] % numServers == 0 ? min : min + 1;
+ }
+
+ /**
+ * Returns the number of servers that should hold maxRegionsIfEvenlyDistributed for a given
+ * table. A special case here is if maxRegionsIfEvenlyDistributed == minRegionsIfEvenlyDistributed,
+ * in which case all servers should hold the max
+ */
+ public int numServersWithMaxRegionsIfEvenlyDistributed(int table) {
+ int numWithMax = numRegionsPerTable[table] % numServers;
+ if (numWithMax == 0) {
+ return numServers;
+ } else {
+ return numWithMax;
+ }
+ }
+
+ /**
+ * Returns true iff at least one server in the cluster stores either more than the min/max load
+ * per server when all regions are evenly distributed across the cluster
+ */
+ public boolean hasUnevenRegionDistribution() {
+ int minLoad = numRegions / numServers;
+ int maxLoad = numRegions % numServers == 0 ? minLoad : minLoad + 1;
+ for (int server = 0; server < numServers; server++) {
+ int numRegions = getNumRegions(server);
+ if (numRegions > maxLoad || numRegions < minLoad) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Returns a pair where the first server is that with the least number of regions across the
+ * cluster and the second server is that with the most number of regions across the cluster
+ */
+ public Pair<Integer, Integer> findLeastAndMostLoadedServers() {
+ int minServer = 0;
+ int maxServer = 0;
+ int minLoad = getNumRegions(minServer);
+ int maxLoad = minLoad;
+ for (int server = 1; server < numServers; server++) {
+ int numRegions = getNumRegions(server);
+ if (numRegions < minLoad) {
+ minServer = server;
+ minLoad = numRegions;
+ }
+ if (numRegions > maxLoad) {
+ maxServer = server;
+ maxLoad = numRegions;
+ }
+ }
+ return Pair.newPair(minServer, maxServer);
+ }
+
/** An action to move or swap a region */
public static class Action {
public static enum Type {
http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
index 5c92973..8cbdd1e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/balancer/StochasticLoadBalancer.java
@@ -18,10 +18,14 @@
package org.apache.hadoop.hbase.master.balancer;
import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.Deque;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -30,7 +34,6 @@ import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.Action;
@@ -49,6 +53,10 @@ import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.MoveRegi
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster.SwapRegionsAction;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
/**
* <p>This is a best effort load balancer. Given a Cost function F(C) => x It will
@@ -920,6 +928,225 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * Generates candidate actions to minimize the TableSkew cost function.
+ *
+ * For efficiency reasons, the cluster must be passed in when this generator is
+ * constructed. Every move generated is applied to the cost function
+ * (i.e. it is assumed that every action we generate is applied to the cluster).
+ * This means we can adjust our cost incrementally for the cluster, rather than
+ * recomputing at each iteration.
+ */
+ static class TableSkewCandidateGenerator extends CandidateGenerator {
+
+ // Mapping of table -> true iff too many servers in the cluster store at least
+ // cluster.maxRegionsIfEvenlydistributed(table)
+ boolean[] tablesWithEnoughServersWithMaxRegions = null;
+
+ @Override
+ Action generate(Cluster cluster) {
+ if (tablesWithEnoughServersWithMaxRegions == null || tablesWithEnoughServersWithMaxRegions.length != cluster.numTables) {
+ tablesWithEnoughServersWithMaxRegions = new boolean[cluster.numTables];
+ }
+ if (cluster.hasUnevenRegionDistribution()) {
+ Pair<Integer, Integer> leastAndMostLoadedServers = cluster.findLeastAndMostLoadedServers();
+ return moveFromTableWithEnoughRegions(cluster, leastAndMostLoadedServers.getSecond(), leastAndMostLoadedServers.getFirst());
+ } else {
+ Optional<TableAndServer> tableServer = findSkewedTableServer(cluster);
+ if (!tableServer.isPresent()) {
+ return Cluster.NullAction;
+ }
+ return findBestActionForTableServer(cluster, tableServer.get());
+ }
+ }
+
+ /**
+ * Returns a move fromServer -> toServer such that after the move fromServer will still have at least
+ * the min # regions in terms of table skew calculation
+ */
+ private Action moveFromTableWithEnoughRegions(Cluster cluster, int fromServer, int toServer) {
+ for (int table : getShuffledRangeOfInts(0, cluster.numTables)) {
+ int min = cluster.minRegionsIfEvenlyDistributed(table);
+ if (cluster.numRegionsPerServerPerTable[fromServer][table] > min) {
+ return getAction(fromServer, pickRandomRegionFromTableOnServer(cluster, fromServer, table), toServer, -1);
+ }
+ }
+ return Cluster.NullAction;
+ }
+
+ /**
+ * Picks a random subset of tables, then for each table T checks across cluster and returns first
+ * server (if any) which holds too many regions from T. Returns Optional.absent() if no servers
+ * are found that hold too many regions.
+ */
+ private Optional<TableAndServer> findSkewedTableServer(Cluster cluster) {
+ Optional<TableAndServer> tableServer = Optional.absent();
+ List<Integer> servers = getShuffledRangeOfInts(0, cluster.numServers);
+ Iterator<Integer> tableIter = getShuffledRangeOfInts(0, cluster.numTables).iterator();
+ while (tableIter.hasNext() && !tableServer.isPresent()) {
+ int table = tableIter.next();
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int numShouldHaveMaxRegions = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+ int numWithMaxRegions = 0;
+ for (int server : servers) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ // if more than max, server clearly has too many regions
+ if (numRegions > maxRegions) {
+ tableServer = Optional.of(new TableAndServer(table, server));
+ break;
+ }
+ // if equal to max, check to see if we are within acceptable limit
+ if (numRegions == maxRegions) {
+ numWithMaxRegions++;
+ }
+ }
+
+ tablesWithEnoughServersWithMaxRegions[table] = numWithMaxRegions >= numShouldHaveMaxRegions;
+ // If we have found a table with more than max, we are done
+ if (tableServer.isPresent()) {
+ break;
+ }
+
+ // Otherwise, check to see if there are too many servers with maxRegions
+ if (numWithMaxRegions > numShouldHaveMaxRegions) {
+ for (int server : servers) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ if (numRegions == maxRegions) {
+ tableServer = Optional.of(new TableAndServer(table, server));
+ break;
+ }
+ }
+ }
+ }
+
+ return tableServer;
+ }
+
+ /**
+ * Returns an list of integers that stores [upper - lower] unique integers in random order
+ * s.t. for each integer i lower <= i < upper
+ */
+ private List<Integer> getShuffledRangeOfInts(int lower, int upper) {
+ Preconditions.checkArgument(lower < upper);
+ ArrayList<Integer> arr = new ArrayList<Integer>(upper - lower);
+ for (int i = lower; i < upper; i++) {
+ arr.add(i);
+ }
+ Collections.shuffle(arr);
+ return arr;
+ }
+
+ /**
+ * Pick a random region from the specified server and table. Returns -1 if no regions from
+ * the given table lie on the given server
+ */
+ protected int pickRandomRegionFromTableOnServer(Cluster cluster, int server, int table) {
+ if (server < 0 || table < 0) {
+ return -1;
+ }
+ List<Integer> regionsFromTable = new ArrayList<>();
+ for (int region : cluster.regionsPerServer[server]) {
+ if (cluster.regionIndexToTableIndex[region] == table) {
+ regionsFromTable.add(region);
+ }
+ }
+ return regionsFromTable.get(RANDOM.nextInt(regionsFromTable.size()));
+ }
+
+ /**
+ * Returns servers in the cluster that store fewer than k regions for the given table (sorted by
+ * servers with the fewest regions from givenTable first)
+ */
+ public List<Integer> getServersWithFewerThanKRegionsFromTable(final Cluster cluster, final int givenTable, int k) {
+ List<Integer> serversWithFewerThanK = new ArrayList<>();
+ for (int server = 0; server < cluster.numServers; server++) {
+ if (cluster.numRegionsPerServerPerTable[server][givenTable] < k) {
+ serversWithFewerThanK.add(server);
+ }
+ }
+ Collections.sort(serversWithFewerThanK, new Comparator<Integer>() {
+ @Override
+ public int compare(Integer o1, Integer o2) {
+ return cluster.numRegionsPerServerPerTable[o1.intValue()][givenTable] - cluster.numRegionsPerServerPerTable[o2.intValue()][givenTable];
+ }
+ });
+ return serversWithFewerThanK;
+ }
+
+ /**
+ * Given a table T for which server S stores too many regions, attempts to find a
+ * SWAP operation that will better balance the cluster
+ */
+ public Action findBestActionForTableServer(Cluster cluster, TableAndServer tableServer) {
+ int fromTable = tableServer.getTable();
+ int fromServer = tableServer.getServer();
+
+ int minNumRegions = cluster.minRegionsIfEvenlyDistributed(fromTable);
+ int maxNumRegions = cluster.maxRegionsIfEvenlyDistributed(fromTable);
+ List<Integer> servers;
+ if (tablesWithEnoughServersWithMaxRegions[fromTable]) {
+ servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, minNumRegions);
+ } else {
+ servers = getServersWithFewerThanKRegionsFromTable(cluster, fromTable, maxNumRegions);
+ }
+
+ if (servers.isEmpty()) {
+ return Cluster.NullAction;
+ }
+
+ Optional<Action> swap = trySwap(cluster, fromServer, fromTable, servers);
+ if (swap.isPresent()) {
+ return swap.get();
+ }
+
+ // If we cannot perform a swap, we should do nothing
+ return Cluster.NullAction;
+ }
+
+ /**
+ * Given server1, table1, we try to find server2 and table2 such that
+ * at least 3 of the following 4 criteria are met
+ *
+ * 1) server1 has too many regions of table1
+ * 2) server1 has too few regions of table2
+ * 3) server2 has too many regions of table2
+ * 4) server2 has too few regions of table1
+ *
+ * We consider N regions from table T
+ * too few if: N < cluster.minRegionsIfEvenlyDistributed(T)
+ * too many if: N > cluster.maxRegionsIfEvenlyDistributed(T)
+ *
+ * Because (1) and (4) are true apriori, we only need to check for (2) and (3).
+ *
+ * If 3 of the 4 criteria are met, we return a swap operation between
+ * randomly selected regions from table1 on server1 and from table2 on server2.
+ *
+ * Optional.absent() is returned if we could not find such a SWAP.
+ */
+ private Optional<Action> trySwap(Cluster cluster, int server1, int table1, List<Integer> candidateServers) {
+ // Because conditions (1) and (4) are true apriori, we only need to meet one of conditions (2) or (3)
+ List<Integer> tables = getShuffledRangeOfInts(0, cluster.numTables);
+ for (int table2 : tables) {
+ int minRegions = cluster.minRegionsIfEvenlyDistributed(table2);
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table2);
+ for (int server2 : candidateServers) {
+ int numRegions1 = cluster.numRegionsPerServerPerTable[server1][table2];
+ int numRegions2 = cluster.numRegionsPerServerPerTable[server2][table2];
+ if (numRegions2 == 0) {
+ continue;
+ }
+ if ((numRegions1 < minRegions || numRegions2 > maxRegions) ||
+ (minRegions != maxRegions && numRegions1 == minRegions && numRegions2 == maxRegions)) {
+ int region1 = pickRandomRegionFromTableOnServer(cluster, server1, table1);
+ int region2 = pickRandomRegionFromTableOnServer(cluster, server2, table2);
+ return Optional.of(getAction(server1, region1, server2, region2));
+ }
+ }
+ }
+ return Optional.absent();
+ }
+ }
+
+ /**
* Base class of StochasticLoadBalancer's Cost Functions.
*/
abstract static class CostFunction {
@@ -966,8 +1193,7 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
break;
case SWAP_REGIONS:
SwapRegionsAction a = (SwapRegionsAction) action;
- regionMoved(a.fromRegion, a.fromServer, a.toServer);
- regionMoved(a.toRegion, a.toServer, a.fromServer);
+ regionSwapped(a.fromRegion, a.fromServer, a.toRegion, a.toServer);
break;
default:
throw new RuntimeException("Uknown action:" + action.type);
@@ -977,6 +1203,11 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
protected void regionMoved(int region, int oldServer, int newServer) {
}
+ protected void regionSwapped(int region1, int server1, int region2, int server2) {
+ regionMoved(region1, server1, server2);
+ regionMoved(region2, server2, server1);
+ }
+
abstract double cost();
/**
@@ -1170,9 +1401,188 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
"hbase.master.balancer.stochastic.tableSkewCost";
private static final float DEFAULT_TABLE_SKEW_COST = 35;
+ /**
+ * Ranges from 0.0 to 1.0 and is the proportion of how much the most skewed table
+ * (as opposed to the average skew across all tables) should affect TableSkew cost
+ */
+ private static final String MAX_TABLE_SKEW_WEIGHT_KEY =
+ "hbase.master.balancer.stochastic.maxTableSkewWeight";
+ private float DEFAULT_MAX_TABLE_SKEW_WEIGHT = 0.0f;
+
+ private final float maxTableSkewWeight;
+ private final float avgTableSkewWeight;
+
+ // Number of moves for each table required to bring the cluster to a perfectly balanced
+ // state (i.e. as if you had round-robin-ed regions across cluster)
+ private int[] numMovesPerTable;
+
TableSkewCostFunction(Configuration conf) {
super(conf);
this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
+ maxTableSkewWeight = conf.getFloat(MAX_TABLE_SKEW_WEIGHT_KEY, DEFAULT_MAX_TABLE_SKEW_WEIGHT);
+ Preconditions.checkArgument(0.0 <= maxTableSkewWeight && maxTableSkewWeight <= 1.0);
+ avgTableSkewWeight = 1 - maxTableSkewWeight;
+ }
+
+ /**
+ * Computes cost by:
+ *
+ * 1) Computing a skew score for each table (based on the number of regions
+ * from that table that would have to be moved to reach an evenly balanced state)
+ *
+ * 2) Taking a weighted average of the highest skew score with the average skew score
+ *
+ * 3) Square rooting that value to more evenly distribute the values between 0-1
+ * (since we have observed they are generally very small).
+ *
+ * @return the table skew cost for the cluster
+ */
+ @Override
+ double cost() {
+ double[] skewPerTable = computeSkewPerTable();
+ if (skewPerTable.length == 0) {
+ return 0;
+ }
+ double maxTableSkew = max(skewPerTable);
+ double avgTableSkew = average(skewPerTable);
+
+ return Math.sqrt(maxTableSkewWeight * maxTableSkew + avgTableSkewWeight * avgTableSkew);
+ }
+
+ @Override
+ void init(Cluster cluster) {
+ super.init(cluster);
+ numMovesPerTable = computeNumMovesPerTable();
+ }
+
+ /**
+ * Adjusts computed number of moves after two regions have been swapped
+ */
+ @Override
+ protected void regionSwapped(int region1, int server1, int region2, int server2) {
+ // If different tables, simply perform two moves
+ if (cluster.regionIndexToTableIndex[region1] != cluster.regionIndexToTableIndex[region2]) {
+ super.regionSwapped(region1, server1, region2, server2);
+ return;
+ }
+ // If same table, do nothing
+ }
+
+ /**
+ * Adjusts computed number of moves per table after a region has been moved
+ */
+ @Override
+ protected void regionMoved(int region, int oldServer, int newServer) {
+ int table = cluster.regionIndexToTableIndex[region];
+ numMovesPerTable[table] = computeNumMovesForTable(table);
+ }
+
+ /**
+ * Returns a mapping of table -> numMoves, where numMoves is the number of regions required to bring
+ * each table to a fully balanced state (i.e. as if its regions had been round-robin-ed across the cluster).
+ */
+ private int[] computeNumMovesPerTable() {
+ // Determine # region moves required for each table to have regions perfectly distributed across cluster
+ int[] numMovesPerTable = new int[cluster.numTables];
+ for (int table = 0; table < cluster.numTables; table++) {
+ numMovesPerTable[table] = computeNumMovesForTable(table);
+ }
+ return numMovesPerTable;
+ }
+
+ /**
+ * Computes the number of moves required across all servers to bring the given table to a balanced state
+ * (i.e. as if its regions had been round-robin-ed across the cluster). We only consider moves as # of regions
+ * that need to be sent, not received, so that we do not double count region moves.
+ */
+ private int computeNumMovesForTable(int table) {
+ int numMinRegions = cluster.minRegionsIfEvenlyDistributed(table);
+ int numMaxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int numMaxServersRemaining = cluster.numServersWithMaxRegionsIfEvenlyDistributed(table);
+ int numMoves = 0;
+
+ for (int server = 0; server < cluster.numServers; server++) {
+ int numRegions = cluster.numRegionsPerServerPerTable[server][table];
+ if (numRegions >= numMaxRegions && numMaxServersRemaining > 0) {
+ numMoves += numRegions - numMaxRegions;
+ numMaxServersRemaining--;
+ } else if (numRegions > numMinRegions) {
+ numMoves += numRegions - numMinRegions;
+ }
+ }
+ return numMoves;
+ }
+
+ /**
+ * Returns mapping of tableIndex -> tableSkewScore, where tableSkewScore is a double between 0 to 1 with
+ * 0 indicating no table skew (i.e. perfect distribution of regions among servers), and 1 representing
+ * pathological table skew (i.e. all of a servers regions belonging to one table).
+ */
+ private double[] computeSkewPerTable() {
+ if (numMovesPerTable == null) {
+ numMovesPerTable = computeNumMovesPerTable();
+ }
+ double[] scaledSkewPerTable = new double[numMovesPerTable.length];
+ for (int table = 0; table < numMovesPerTable.length; table++) {
+ int numTotalRegions = cluster.numRegionsPerTable[table];
+ int maxRegions = cluster.maxRegionsIfEvenlyDistributed(table);
+ int pathologicalNumMoves = numTotalRegions - maxRegions;
+ scaledSkewPerTable[table] = pathologicalNumMoves == 0 ? 0 : (double) numMovesPerTable[table] / pathologicalNumMoves;
+ }
+ return scaledSkewPerTable;
+ }
+
+ /**
+ * Returns the max of the values in the passed array
+ */
+ private double max(double[] arr) {
+ double max = arr[0];
+ for (double d : arr) {
+ if (d > max) {
+ max = d;
+ }
+ }
+ return max;
+ }
+
+ /**
+ * Returns the average of the values in the passed array
+ */
+ private double average(double[] arr) {
+ double sum = 0;
+ for (double d : arr) {
+ sum += d;
+ }
+ return sum / arr.length;
+ }
+ }
+
+ /**
+ * Compute the cost of a potential cluster configuration based upon how evenly
+ * distributed tables are.
+ *
+ * @deprecated replaced by TableSkewCostFunction
+ * This function only considers the maximum # of regions of each table stored
+ * on any one server. This, however, neglects a number of cases. Consider the case
+ * where N servers store 1 more region than as if the regions had been round robin-ed
+ * across the cluster, but then K servers stored 0 regions of the table. The maximum
+ * # regions stored would not properly reflect the table-skew of the cluster.
+ *
+ * Furthermore, this relies upon the cluster.numMaxRegionsPerTable field, which is not
+ * properly updated. The values per table only increase as the cluster shifts (i.e.
+ * as new maxima are found), but they do not go down when the maximum skew decreases
+ * for a particular table.
+ */
+ @Deprecated
+ static class OldTableSkewCostFunction extends CostFunction {
+
+ private static final String TABLE_SKEW_COST_KEY =
+ "hbase.master.balancer.stochastic.tableSkewCost";
+ private static final float DEFAULT_TABLE_SKEW_COST = 35;
+
+ OldTableSkewCostFunction(Configuration conf) {
+ super(conf);
+ this.setMultiplier(conf.getFloat(TABLE_SKEW_COST_KEY, DEFAULT_TABLE_SKEW_COST));
}
@Override
@@ -1450,7 +1860,8 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
for (int i = 0 ; i < costsPerGroup.length; i++) {
totalCost += costsPerGroup[i];
}
- return scale(0, maxCost, totalCost);
+ // Still return high cost for single region replicas being cohosted even as cluster scales
+ return Math.sqrt(scale(0, maxCost, totalCost));
}
/**
@@ -1589,9 +2000,31 @@ public class StochasticLoadBalancer extends BaseLoadBalancer {
}
/**
+ * Data structure that holds table and server indexes
+ */
+ static class TableAndServer {
+ private final int table;
+ private final int server;
+
+ public TableAndServer(int table, int server) {
+ this.table = table;
+ this.server = server;
+ }
+
+ public int getTable() {
+ return table;
+ }
+
+ public int getServer() {
+ return server;
+ }
+ }
+
+ /**
* A helper function to compose the attribute name from tablename and costfunction name
*/
public static String composeAttributeName(String tableName, String costFunctionName) {
return tableName + TABLE_FUNCTION_SEP + costFunctionName;
}
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
index 9d193d2..37ff35f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer.java
@@ -48,6 +48,8 @@ import org.apache.hadoop.hbase.master.MockNoopMasterServices;
import org.apache.hadoop.hbase.master.RackManager;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.Cluster;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.CandidateGenerator;
+import org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer.TableSkewCandidateGenerator;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -118,7 +120,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
*/
@Test
public void testBalanceCluster() throws Exception {
-
+ float oldMinCostNeedBalance = conf.getFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.05f);
+ conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, 0.02f);
+ loadBalancer.setConf(conf);
for (int[] mockCluster : clusterStateMocks) {
Map<ServerName, List<HRegionInfo>> servers = mockClusterServers(mockCluster);
List<ServerAndLoad> list = convertToList(servers);
@@ -134,6 +138,9 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
returnServer(entry.getKey());
}
}
+ // reset config
+ conf.setFloat(StochasticLoadBalancer.MIN_COST_NEED_BALANCE_KEY, oldMinCostNeedBalance);
+ loadBalancer.setConf(conf);
}
@Test
@@ -252,6 +259,32 @@ public class TestStochasticLoadBalancer extends BalancerTestBase {
double result = storeFileCostFunction.getRegionLoadCost(regionLoads);
// storefile size cost is simply an average of it's value over time
assertEquals(2.5, result, 0.01);
+ }
+
+ @Test (timeout=60000)
+ public void testTableSkewCandidateGeneratorConvergesToZero() {
+ int replication = 1;
+ StochasticLoadBalancer.CostFunction
+ costFunction = new StochasticLoadBalancer.TableSkewCostFunction(conf);
+ CandidateGenerator generator = new TableSkewCandidateGenerator();
+ for (int i = 0; i < 100; i++) {
+ int numNodes = 1 + rand.nextInt(5 * i + 1);
+ int numTables = 1 + rand.nextInt(5 * i + 1);
+ int numRegions = rand.nextInt(numTables * 99) + Math.max(numTables, numNodes); // num regions between max(numTables, numNodes) - numTables*100
+ int numRegionsPerServer = rand.nextInt(numRegions / numNodes) + 1; // num regions per server (except one) between 1 and numRegions / numNodes
+
+ Map<ServerName, List<HRegionInfo>> serverMap = createServerMap(numNodes, numRegions, numRegionsPerServer, replication, numTables);
+ BaseLoadBalancer.Cluster cluster = new Cluster(serverMap, null, null, null);
+ costFunction.init(cluster);
+ double cost = costFunction.cost();
+ while (cost > 0) {
+ Cluster.Action action = generator.generate(cluster);
+ cluster.doAction(action);
+ costFunction.postAction(action);
+ cost = costFunction.cost();
+ }
+ assertEquals(0, cost, .000000000001);
+ }
}
@Test
http://git-wip-us.apache.org/repos/asf/hbase/blob/53e9a1c4/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
index 2f315de..03d2ef2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/balancer/TestStochasticLoadBalancer2.java
@@ -35,6 +35,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
conf.setFloat("hbase.master.balancer.stochastic.maxMovePercent", 1.0f);
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 2000000L);
conf.setFloat("hbase.master.balancer.stochastic.localityCost", 0);
+
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 90 * 1000); // 90 sec
conf.setFloat("hbase.master.balancer.stochastic.minCostNeedBalance", 0.05f);
loadBalancer.setConf(conf);
@@ -70,6 +71,7 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
public void testRegionReplicasOnMidClusterHighReplication() {
conf.setLong(StochasticLoadBalancer.MAX_STEPS_KEY, 4000000L);
conf.setLong("hbase.master.balancer.stochastic.maxRunningTime", 120 * 1000); // 120 sec
+ conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 4);
loadBalancer.setConf(conf);
int numNodes = 80;
int numRegions = 6 * numNodes;
@@ -77,6 +79,8 @@ public class TestStochasticLoadBalancer2 extends BalancerTestBase {
int numRegionsPerServer = 5;
int numTables = 10;
testWithCluster(numNodes, numRegions, numRegionsPerServer, replication, numTables, false, true);
+ // reset config
+ conf.setFloat("hbase.master.balancer.stochastic.tableSkewCost", 35);
}
@Test (timeout = 800000)