You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/09/29 23:00:25 UTC

[4/4] hbase git commit: HBASE-18436 Add client-side hedged read metrics (Yun Zhao)

HBASE-18436 Add client-side hedged read metrics (Yun Zhao)

Conflicts:
	hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/8148a75c
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/8148a75c
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/8148a75c

Branch: refs/heads/branch-1.4
Commit: 8148a75cb7129341fef5e169a6ec08a55aeb416d
Parents: bb8c716
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Sep 29 14:02:49 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Sep 29 15:43:13 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/client/MetricsConnection.java  |  14 ++
 .../RpcRetryingCallerWithReadReplicas.java      |  11 +-
 .../hadoop/hbase/client/TestReplicasClient.java | 129 ++++++++++++++++---
 .../TestSplitTransactionOnCluster.java          |   6 +-
 4 files changed, 134 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/8148a75c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index 9dd803a..7180ac2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -278,6 +278,8 @@ public class MetricsConnection implements StatisticTrackable {
   @VisibleForTesting protected final RunnerStats runnerStats;
   @VisibleForTesting protected final Counter metaCacheNumClearServer;
   @VisibleForTesting protected final Counter metaCacheNumClearRegion;
+  @VisibleForTesting protected final Counter hedgedReadOps;
+  @VisibleForTesting protected final Counter hedgedReadWin;
 
   // dynamic metrics
 
@@ -336,6 +338,8 @@ public class MetricsConnection implements StatisticTrackable {
       "metaCacheNumClearServer", scope);
     this.metaCacheNumClearRegion = registry.newCounter(this.getClass(),
       "metaCacheNumClearRegion", scope);
+    this.hedgedReadOps = registry.newCounter(this.getClass(), "hedgedReadOps", scope);
+    this.hedgedReadWin = registry.newCounter(this.getClass(), "hedgedReadWin", scope);
     this.getTracker = new CallTracker(this.registry, "Get", scope);
     this.scanTracker = new CallTracker(this.registry, "Scan", scope);
     this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
@@ -395,6 +399,16 @@ public class MetricsConnection implements StatisticTrackable {
     metaCacheNumClearRegion.inc();
   }
 
+  /** Increment the number of hedged read that have occurred. */
+  public void incrHedgedReadOps() {
+    hedgedReadOps.inc();
+  }
+
+  /** Increment the number of hedged read returned faster than the original read. */
+  public void incrHedgedReadWin() {
+    hedgedReadWin.inc();
+  }
+
   /** Increment the number of normal runner counts. */
   public void incrNormalRunners() {
     this.runnerStats.incrNormalRunners();

http://git-wip-us.apache.org/repos/asf/hbase/blob/8148a75c/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index cbbe9f4..5a41233 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -244,6 +244,9 @@ public class RpcRetryingCallerWithReadReplicas {
           if (f != null) {
             return f.get(); //great we got a response
           }
+          if (cConnection.getConnectionMetrics() != null) {
+            cConnection.getConnectionMetrics().incrHedgedReadOps();
+          }
         } catch (ExecutionException e) {
           // We ignore the ExecutionException and continue with the secondary replicas
           if (LOG.isDebugEnabled()) {
@@ -267,13 +270,17 @@ public class RpcRetryingCallerWithReadReplicas {
     }
 
     try {
-      Future<Result> f = cs.pollForFirstSuccessfullyCompletedTask(operationTimeout,
-          TimeUnit.MILLISECONDS, startIndex, endIndex);
+      ResultBoundedCompletionService<Result>.QueueingFuture<Result> f =
+          cs.pollForFirstSuccessfullyCompletedTask(operationTimeout, TimeUnit.MILLISECONDS, startIndex, endIndex);
       if (f == null) {
         throw new RetriesExhaustedException("Timed out after " + operationTimeout +
             "ms. Get is sent to replicas with startIndex: " + startIndex +
             ", endIndex: " + endIndex + ", Locations: " + rl);
       }
+      if (cConnection.getConnectionMetrics() != null && !isTargetReplicaSpecified &&
+          !skipPrimary && f.getReplicaId() != RegionReplicaUtil.DEFAULT_REPLICA_ID) {
+        cConnection.getConnectionMetrics().incrHedgedReadWin();
+      }
       return f.get();
     } catch (ExecutionException e) {
       throwEnrichedException(e, retries);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8148a75c/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 11eb934..a9f32cb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -71,6 +71,8 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import com.yammer.metrics.core.Counter;
+
 /**
  * Tests for region replicas. Sad that we cannot isolate these without bringing up a whole
  * cluster. See {@link org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster}.
@@ -102,8 +104,10 @@ public class TestReplicasClient {
     static final AtomicLong sleepTime = new AtomicLong(0);
     static final AtomicBoolean slowDownNext = new AtomicBoolean(false);
     static final AtomicInteger countOfNext = new AtomicInteger(0);
-    private static final AtomicReference<CountDownLatch> cdl =
-        new AtomicReference<CountDownLatch>(new CountDownLatch(0));
+    private static final AtomicReference<CountDownLatch> primaryCdl =
+        new AtomicReference<>(new CountDownLatch(0));
+    private static final AtomicReference<CountDownLatch> secondaryCdl =
+        new AtomicReference<>(new CountDownLatch(0));
     Random r = new Random();
     public SlowMeCopro() {
     }
@@ -139,7 +143,8 @@ public class TestReplicasClient {
 
     private void slowdownCode(final ObserverContext<RegionCoprocessorEnvironment> e) {
       if (e.getEnvironment().getRegion().getRegionInfo().getReplicaId() == 0) {
-        CountDownLatch latch = getCdl().get();
+        LOG.info("We're the primary replicas.");
+        CountDownLatch latch = getPrimaryCdl().get();
         try {
           if (sleepTime.get() > 0) {
             LOG.info("Sleeping for " + sleepTime.get() + " ms");
@@ -156,11 +161,27 @@ public class TestReplicasClient {
         }
       } else {
         LOG.info("We're not the primary replicas.");
+        CountDownLatch latch = getSecondaryCdl().get();
+        try {
+          if (latch.getCount() > 0) {
+            LOG.info("Waiting for the secondary counterCountDownLatch");
+            latch.await(2, TimeUnit.MINUTES); // To help the tests to finish.
+            if (latch.getCount() > 0) {
+              throw new RuntimeException("Can't wait more");
+            }
+          }
+        } catch (InterruptedException e1) {
+          LOG.error(e1);
+        }
       }
     }
 
-    public static AtomicReference<CountDownLatch> getCdl() {
-      return cdl;
+    public static AtomicReference<CountDownLatch> getPrimaryCdl() {
+      return primaryCdl;
+    }
+
+    public static AtomicReference<CountDownLatch> getSecondaryCdl() {
+      return secondaryCdl;
     }
   }
 
@@ -170,6 +191,7 @@ public class TestReplicasClient {
     HTU.getConfiguration().setInt(
         StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, REFRESH_PERIOD);
     HTU.getConfiguration().setBoolean("hbase.client.log.scanner.activity", true);
+    HTU.getConfiguration().setBoolean(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, true);
     ConnectionUtils.setupMasterlessConnection(HTU.getConfiguration());
     HTU.startMiniCluster(NB_SERVERS);
 
@@ -297,7 +319,7 @@ public class TestReplicasClient {
   public void testUseRegionWithoutReplica() throws Exception {
     byte[] b1 = "testUseRegionWithoutReplica".getBytes();
     openRegion(hriSecondary);
-    SlowMeCopro.getCdl().set(new CountDownLatch(0));
+    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(0));
     try {
       Get g = new Get(b1);
       Result r = table.get(g);
@@ -353,14 +375,14 @@ public class TestReplicasClient {
     byte[] b1 = "testGetNoResultStaleRegionWithReplica".getBytes();
     openRegion(hriSecondary);
 
-    SlowMeCopro.getCdl().set(new CountDownLatch(1));
+    SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
     try {
       Get g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       Result r = table.get(g);
       Assert.assertTrue(r.isStale());
     } finally {
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       closeRegion(hriSecondary);
     }
   }
@@ -470,13 +492,13 @@ public class TestReplicasClient {
       LOG.info("sleep and is not stale done");
 
       // But if we ask for stale we will get it
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
 
       LOG.info("stale done");
 
@@ -489,14 +511,14 @@ public class TestReplicasClient {
       LOG.info("exists not stale done");
 
       // exists works on stale but don't see the put
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setCheckExistenceOnly(true);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertFalse("The secondary has stale data", r.getExists());
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       LOG.info("exists stale before flush done");
 
       flushRegion(hriPrimary);
@@ -505,28 +527,93 @@ public class TestReplicasClient {
       Thread.sleep(1000 + REFRESH_PERIOD * 2);
 
       // get works and is not stale
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertFalse(r.isEmpty());
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       LOG.info("stale done");
 
       // exists works on stale and we see the put after the flush
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setCheckExistenceOnly(true);
       g.setConsistency(Consistency.TIMELINE);
       r = table.get(g);
       Assert.assertTrue(r.isStale());
       Assert.assertTrue(r.getExists());
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       LOG.info("exists stale after flush done");
 
     } finally {
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
+      SlowMeCopro.sleepTime.set(0);
+      Delete d = new Delete(b1);
+      table.delete(d);
+      closeRegion(hriSecondary);
+    }
+  }
+
+  @Test
+  public void testHedgedRead() throws Exception {
+    byte[] b1 = "testHedgedRead".getBytes();
+    openRegion(hriSecondary);
+
+    try {
+      // A simple put works, even if there here a second replica
+      Put p = new Put(b1);
+      p.addColumn(f, b1, b1);
+      table.put(p);
+      LOG.info("Put done");
+
+      // A get works and is not stale
+      Get g = new Get(b1);
+      Result r = table.get(g);
+      Assert.assertFalse(r.isStale());
+      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+      LOG.info("get works and is not stale done");
+
+      //reset
+      ClusterConnection connection = (ClusterConnection) HTU.getConnection();
+      Counter hedgedReadOps = connection.getConnectionMetrics().hedgedReadOps;
+      Counter hedgedReadWin = connection.getConnectionMetrics().hedgedReadWin;
+      hedgedReadOps.dec(hedgedReadOps.count());
+      hedgedReadWin.dec(hedgedReadWin.count());
+
+      // Wait a little on the main region, just enough to happen once hedged read
+      // and hedged read did not returned faster
+      int primaryCallTimeoutMicroSecond = connection.getConnectionConfiguration().getPrimaryCallTimeoutMicroSecond();
+      SlowMeCopro.sleepTime.set(TimeUnit.MICROSECONDS.toMillis(primaryCallTimeoutMicroSecond));
+      SlowMeCopro.getSecondaryCdl().set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertFalse(r.isStale());
+      Assert.assertFalse(r.getColumnCells(f, b1).isEmpty());
+      Assert.assertEquals(hedgedReadOps.count(), 1);
+      Assert.assertEquals(hedgedReadWin.count(), 0);
+      SlowMeCopro.sleepTime.set(0);
+      SlowMeCopro.getSecondaryCdl().get().countDown();
+      LOG.info("hedged read occurred but not faster");
+
+
+      // But if we ask for stale we will get it and hedged read returned faster
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
+      g = new Get(b1);
+      g.setConsistency(Consistency.TIMELINE);
+      r = table.get(g);
+      Assert.assertTrue(r.isStale());
+      Assert.assertTrue(r.getColumnCells(f, b1).isEmpty());
+      Assert.assertEquals(hedgedReadOps.count(), 2);
+      Assert.assertEquals(hedgedReadWin.count(), 1);
+      SlowMeCopro.getPrimaryCdl().get().countDown();
+      LOG.info("hedged read occurred and faster");
+
+    } finally {
+      SlowMeCopro.getPrimaryCdl().get().countDown();
+      SlowMeCopro.getSecondaryCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       Delete d = new Delete(b1);
       table.delete(d);
@@ -559,7 +646,7 @@ public class TestReplicasClient {
           .getAsyncProcess();
 
       // Make primary slowdown
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
 
       List<Get> gets = new ArrayList<Get>();
       Get g = new Get(b1);
@@ -587,7 +674,7 @@ public class TestReplicasClient {
         Assert.assertTrue(m.isCancelled());
       }
     } finally {
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
@@ -653,7 +740,7 @@ public class TestReplicasClient {
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
     } finally {
-      SlowMeCopro.cdl.get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
@@ -734,7 +821,7 @@ public class TestReplicasClient {
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);
     } finally {
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       SlowMeCopro.sleepTime.set(0);
       SlowMeCopro.slowDownNext.set(false);
       SlowMeCopro.countOfNext.set(0);

http://git-wip-us.apache.org/repos/asf/hbase/blob/8148a75c/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 9c80f7b..38960f7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -1011,15 +1011,15 @@ public class TestSplitTransactionOnCluster {
       Assert.assertFalse(r.isStale());
       LOG.info("exists stale after flush done");
 
-      SlowMeCopro.getCdl().set(new CountDownLatch(1));
+      SlowMeCopro.getPrimaryCdl().set(new CountDownLatch(1));
       g = new Get(b1);
       g.setConsistency(Consistency.TIMELINE);
       // This will succeed because in the previous GET we get the location of the replica
       r = t.get(g);
       Assert.assertTrue(r.isStale());
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
     } finally {
-      SlowMeCopro.getCdl().get().countDown();
+      SlowMeCopro.getPrimaryCdl().get().countDown();
       admin.setBalancerRunning(true, false);
       cluster.getMaster().setCatalogJanitorEnabled(true);
       t.close();