You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/29 23:00:23 UTC
[2/4] hbase git commit: HBASE-18436 Add client-side hedged read
metrics (Yun Zhao)
HBASE-18436 Add client-side hedged read metrics (Yun Zhao)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ca87d05a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ca87d05a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ca87d05a
Branch: refs/heads/master
Commit: ca87d05a518338e64099f42c229d557b93ce51c8
Parents: 3bd824f
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Sep 29 14:02:49 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Sep 29 15:37:04 2017 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/MetricsConnection.java | 14 +++
.../RpcRetryingCallerWithReadReplicas.java | 11 +-
.../hadoop/hbase/client/TestReplicasClient.java | 126 ++++++++++++++++---
.../TestSplitTransactionOnCluster.java | 6 +-
4 files changed, 132 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca87d05a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 31612f3..c54729b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -269,6 +269,8 @@ public class MetricsConnection implements StatisticTrackable {
@VisibleForTesting protected final RunnerStats runnerStats;
@VisibleForTesting protected final Counter metaCacheNumClearServer;
@VisibleForTesting protected final Counter metaCacheNumClearRegion;
+ @VisibleForTesting protected final Counter hedgedReadOps;
+ @VisibleForTesting protected final Counter hedgedReadWin;
// dynamic metrics
@@ -315,6 +317,8 @@ public class MetricsConnection implements StatisticTrackable {
"metaCacheNumClearServer", scope));
this.metaCacheNumClearRegion = registry.counter(name(this.getClass(),
"metaCacheNumClearRegion", scope));
+ this.hedgedReadOps = registry.counter(name(this.getClass(), "hedgedReadOps", scope));
+ this.hedgedReadWin = registry.counter(name(this.getClass(), "hedgedReadWin", scope));
this.getTracker = new CallTracker(this.registry, "Get", scope);
this.scanTracker = new CallTracker(this.registry, "Scan", scope);
this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
@@ -373,6 +377,16 @@ public class MetricsConnection implements StatisticTrackable {
metaCacheNumClearRegion.inc();
}
+ /** Increment the number of hedged read that have occurred. */
+ public void incrHedgedReadOps() {
+ hedgedReadOps.inc();
+ }
+
+ /** Increment the number of hedged read returned faster than the original read. */
+ public void incrHedgedReadWin() {
+ hedgedReadWin.inc();
+ }
+
/** Increment the number of normal runner counts. */
public void incrNormalRunners() {
this.runnerStats.incrNormalRunners();
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca87d05a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index e7a4ba6..c6ba228 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -216,6 +216,9 @@ public class RpcRetryingCallerWithReadReplicas {
if (f != null) {
return f.get(); //great we got a response
}
+ if (cConnection.getConnectionMetrics() != null) {
+ cConnection.getConnectionMetrics().incrHedgedReadOps();
+ }
} catch (ExecutionException e) {
// We ignore the ExecutionException and continue with the secondary replicas
if (LOG.isDebugEnabled()) {
@@ -238,13 +241,17 @@ public class RpcRetryingCallerWithReadReplicas {
addCallsForReplica(cs, rl, 1, rl.size() - 1);
}
try {
- Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
- TimeUnit.MILLISECONDS, startIndex, endIndex);
+ ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
+ cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
if (f == null) {
throw new RetriesExhaustedException("Timed out after " + operationTimeout +
"ms. Get is sent to replicas with startIndex: " + startIndex +
", endIndex: " + endIndex + ", Locations: " + rl);
}
+ if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
+ !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+ cConnection.getConnectionMetrics().incrHedgedReadWin();
+ }
return f.get();
} catch (ExecutionException e) {
throwEnrichedException(e, retries);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca87d05a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 1a3cfbf..ced7ce8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import com.codahale.metrics.Counter;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@@ -104,7 +105,9 @@ public class TestReplicasClient {
static final AtomicLong sleepTime = new AtomicLong(0);
static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
static final AtomicInteger countOfNext = new AtomicInteger(0);
- private static final AtomicReference<CountDownLatch> cdl =
+ private static final AtomicReference<CountDownLatch> primaryCdl =
+ new AtomicReference<>(new CountDownLatch(0));
+ private static final AtomicReference<CountDownLatch> secondaryCdl =
new AtomicReference<>(new CountDownLatch(0));
Random r = new Random();
public SlowMeCopro() {
@@ -146,7 +149,8 @@ public class TestReplicasClient {
private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
- CountDownLatch latch = getCdl().get();
+ LOG.info("We're the primary replicas.");
+ CountDownLatch latch = getPrimaryCdl().get();
try {
if (sleepTime.get() > 0) {
LOG.info("Sleeping for " + sleepTime.get() + " ms");
@@ -163,11 +167,27 @@ public class TestReplicasClient {
}
} else {
LOG.info("We're not the primary replicas.");
+ CountDownLatch latch = getSecondaryCdl().get();
+ try {
+ if (latch.getCount() > 0) {
+ LOG.info("Waiting for the secondary counterCountDownLatch");
+ latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+ if (latch.getCount() > 0) {
+ throw new RuntimeException("Can't wait more");
+ }
+ }
+ } catch (InterruptedException e1) {
+ LOG.error(e1);
+ }
}
}
- public static AtomicReference<CountDownLatch> getCdl() {
- return cdl;
+ public static AtomicReference<CountDownLatch> getPrimaryCdl() {
+ return primaryCdl;
+ }
+
+ public static AtomicReference<CountDownLatch> getSecondaryCdl() {
+ return secondaryCdl;
}
}
@@ -177,6 +197,7 @@ public class TestReplicasClient {
HTU.getConfiguration().setInt(
StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+ HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
HTU.startMiniCluster(NB_SERVERS);
@@ -296,7 +317,7 @@ public class TestReplicasClient {
public void testUseRegionWithoutReplica() throws Exception {
byte[] b1 = "testUseRegionWithoutReplica".getBytes();
openRegion(hriSecondary);
- SlowMeCopro.getCdl().set(new CountDownLatch(0));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
try {
Get g = new Get(b1);
Result r = table.get(g);
@@ -352,14 +373,14 @@ public class TestReplicasClient {
byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
openRegion(hriSecondary);
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
try {
Get g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
Result r = table.get(g);
Assert.assertTrue(r.isStale());
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
closeRegion(hriSecondary);
}
}
@@ -469,13 +490,13 @@ public class TestReplicasClient {
LOG.info("sleep and is not stale done");
// But if we ask for stale we will get it
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("stale done");
@@ -488,14 +509,14 @@ public class TestReplicasClient {
LOG.info("exists not stale done");
// exists works on stale but don't see the put
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse("The secondary has stale data", r.getExists());
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("exists stale before flush done");
flushRegion(hriPrimary);
@@ -504,28 +525,93 @@ public class TestReplicasClient {
Thread.sleep(1000 + REFRESH_PERIOD * 2);
// get works and is not stale
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertFalse(r.isEmpty());
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("stale done");
// exists works on stale and we see the put after the flush
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setCheckExistenceOnly(true);
g.setConsistency(Consistency.TIMELINE);
r = table.get(g);
Assert.assertTrue(r.isStale());
Assert.assertTrue(r.getExists());
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
LOG.info("exists stale after flush done");
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ Delete d = new Delete(b1);
+ table.delete(d);
+ closeRegion(hriSecondary);
+ }
+ }
+
+ @Test
+ public void testHedgedRead() throws Exception {
+ byte[] b1 = "testHedgedRead".getBytes();
+ openRegion(hriSecondary);
+
+ try {
+ // A simple put works, even if there here a second replica
+ Put p = new Put(b1);
+ p.addColumn(f, b1, b1);
+ table.put(p);
+ LOG.info("Put done");
+
+ // A get works and is not stale
+ Get g = new Get(b1);
+ Result r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+ LOG.info("get works and is not stale done");
+
+ //reset
+ ClusterConnection connection = (ClusterConnection) HTU.getConnection();
+ Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
+ Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
+ hedgedReadOps.dec(hedgedReadOps.getCount());
+ hedgedReadWin.dec(hedgedReadWin.getCount());
+
+ // Wait a little on the main region, just enough to happen once hedged read
+ // and hedged read did not returned faster
+ int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
+ SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
+ SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertFalse(r.isStale());
+ Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+ Assert.assertEquals(hedgedReadOps.getCount(), 1);
+ Assert.assertEquals(hedgedReadWin.getCount(), 0);
+ SlowMeCopro.sleepTime.set(0);
+ SlowMeCopro.getSecondaryCdl().get().countDown();
+ LOG.info("hedged read occurred but not faster");
+
+
+ // But if we ask for stale we will get it and hedged read returned faster
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+ g = new Get(b1);
+ g.setConsistency(Consistency.TIMELINE);
+ r = table.get(g);
+ Assert.assertTrue(r.isStale());
+ Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
+ Assert.assertEquals(hedgedReadOps.getCount(), 2);
+ Assert.assertEquals(hedgedReadWin.getCount(), 1);
+ SlowMeCopro.getPrimaryCdl().get().countDown();
+ LOG.info("hedged read occurred and faster");
+
+ } finally {
+ SlowMeCopro.getPrimaryCdl().get().countDown();
+ SlowMeCopro.getSecondaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
Delete d = new Delete(b1);
table.delete(d);
@@ -557,7 +643,7 @@ public class TestReplicasClient {
AsyncProcess ap = ((ClusterConnection) HTU.getConnection()).getAsyncProcess();
// Make primary slowdown
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
List<Get> gets = new ArrayList<>();
Get g = new Get(b1);
@@ -595,7 +681,7 @@ public class TestReplicasClient {
Assert.assertTrue(m.isCancelled());
}
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
@@ -661,7 +747,7 @@ public class TestReplicasClient {
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
@@ -742,7 +828,7 @@ public class TestReplicasClient {
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
SlowMeCopro.sleepTime.set(0);
SlowMeCopro.slowDownNext.set(false);
SlowMeCopro.countOfNext.set(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/ca87d05a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 1a69be3..cee4caf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -617,15 +617,15 @@ public class TestSplitTransactionOnCluster {
Assert.assertFalse(r.isStale());
LOG.info("exists stale after flush done");
- SlowMeCopro.getCdl().set(new CountDownLatch(1));
+ SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
g = new Get(b1);
g.setConsistency(Consistency.TIMELINE);
// This will succeed because in the previous GET we get the location of the replica
r = t.get(g);
Assert.assertTrue(r.isStale());
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
} finally {
- SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.getPrimaryCdl().get().countDown();
admin.setBalancerRunning(true, false);
cluster.getMaster().setCatalogJanitorEnabled(true);
t.close();