You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2020/05/28 02:11:22 UTC

[hbase] branch branch-1 updated: HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)

This is an automated email from the ASF dual-hosted git repository.

reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 6aa2286  HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)
6aa2286 is described below

commit 6aa2286733f8ca224c03c1a61c17077fa29410a3
Author: Javier Akira Luca de Tena <ak...@gmail.com>
AuthorDate: Thu May 28 11:11:10 2020 +0900

    HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)
    
    Co-authored-by: stack <st...@apache.org>
    Co-authored-by: Javier <ja...@linecorp.com>
    Signed-off-by: Reid Chan <re...@apache.org>
---
 .../regionserver/MetricsRegionServerSource.java    |   6 +
 .../regionserver/MetricsRegionServerWrapper.java   |  10 ++
 .../MetricsRegionServerSourceImpl.java             |   5 +
 .../MetricsRegionServerWrapperImpl.java            |  23 ++++
 .../java/org/apache/hadoop/hbase/util/FSUtils.java |  45 ++++++++
 .../MetricsRegionServerWrapperStub.java            |  10 ++
 .../org/apache/hadoop/hbase/util/TestFSUtils.java  | 127 +++++++++++++++++++++
 7 files changed, 226 insertions(+)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index e006259..219bb69 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -504,6 +504,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
   String MAJOR_COMPACTED_OUTPUT_BYTES_DESC
     = "Total number of bytes that is output from compaction, major only";
 
+  String HEDGED_READS = "hedgedReads";
+  String HEDGED_READS_DESC = "The number of times we started a hedged read";
+  String HEDGED_READ_WINS = "hedgedReadWins";
+  String HEDGED_READ_WINS_DESC =
+    "The number of times we started a hedged read and a hedged read won";
+
   String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount";
   String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered.";
   String RPC_SCAN_REQUEST_COUNT = "rpcScanRequestCount";
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index ec84acc..c9e1545 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -349,6 +349,16 @@ public interface MetricsRegionServerWrapper {
   long getMajorCompactedCellsSize();
 
   /**
+   * @return Count of hedged read operations
+   */
+  long getHedgedReadOps();
+
+  /**
+   * @return Count of times a hedged read beat out the primary read.
+   */
+  long getHedgedReadWins();
+
+  /**
    * @return Number of total bytes read from HDFS.
    */
   long getTotalBytesRead();
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 31b5a97..24ab167 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -504,6 +504,11 @@ public class MetricsRegionServerSourceImpl
               rsWrap.getCompactedCellsSize())
           .addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
               rsWrap.getMajorCompactedCellsSize())
+
+          .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps())
+          .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
+              rsWrap.getHedgedReadWins())
+
           .addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC),
             rsWrap.getBlockedRequestsCount())
           .tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 0d20c7d..6817e04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
@@ -40,7 +41,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.CacheStats;
 import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
 import org.apache.hadoop.metrics2.MetricsExecutor;
 
 /**
@@ -98,6 +101,11 @@ class MetricsRegionServerWrapperImpl
   private Runnable runnable;
   private long period;
 
+  /**
+   * Can be null if not on hdfs.
+   */
+  private DFSHedgedReadMetrics dfsHedgedReadMetrics;
+
   public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
     this.regionServer = regionServer;
     initBlockCache();
@@ -111,6 +119,11 @@ class MetricsRegionServerWrapperImpl
     this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
       TimeUnit.MILLISECONDS);
 
+    try {
+      this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
+    } catch (IOException e) {
+      LOG.warn("Failed to get hedged metrics", e);
+    }
     if (LOG.isInfoEnabled()) {
       LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
     }
@@ -820,6 +833,16 @@ class MetricsRegionServerWrapperImpl
   }
 
   @Override
+  public long getHedgedReadOps() {
+    return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadOps();
+  }
+
+  @Override
+  public long getHedgedReadWins() {
+    return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadWins();
+  }
+
+  @Override
   public long getBlockedRequestsCount() {
     return blockedRequestsCount;
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 3a04b4c..6ea7bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.SequenceFile;
@@ -1780,4 +1782,47 @@ public abstract class FSUtils extends CommonFSUtils {
     int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
     conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
   }
+
+  /**
+   * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
+   */
+  public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
+      throws IOException {
+    if (!isHDFS(c)) {
+      return null;
+    }
+    // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
+    // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
+    // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
+    final String name = "getHedgedReadMetrics";
+    DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
+    Method m;
+    try {
+      m = dfsclient.getClass().getDeclaredMethod(name);
+    } catch (NoSuchMethodException e) {
+      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
+          e.getMessage());
+      return null;
+    } catch (SecurityException e) {
+      LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
+          e.getMessage());
+      return null;
+    }
+    m.setAccessible(true);
+    try {
+      return (DFSHedgedReadMetrics)m.invoke(dfsclient);
+    } catch (IllegalAccessException e) {
+      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+          e.getMessage());
+      return null;
+    } catch (IllegalArgumentException e) {
+      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+          e.getMessage());
+      return null;
+    } catch (InvocationTargetException e) {
+      LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+          e.getMessage());
+      return null;
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index dbf4758..90c240e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@ -361,6 +361,16 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
   }
 
   @Override
+  public long getHedgedReadOps() {
+    return 100;
+  }
+
+  @Override
+  public long getHedgedReadWins() {
+    return 10;
+  }
+
+  @Override
   public long getTotalBytesRead() {
     return 0;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 52027a9..43f5187 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -27,11 +27,13 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Random;
 import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +45,9 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.exceptions.DeserializationException;
 import org.apache.hadoop.hbase.fs.HFileSystem;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
+import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.Assert;
 import org.junit.Before;
@@ -435,4 +440,126 @@ public class TestFSUtils {
     }
   }
 
+  /**
+   * Ugly test that ensures we can get at the hedged read counters in dfsclient.
+   * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
+   */
+  @Test public void testDFSHedgedReadMetrics() throws Exception {
+    HBaseTestingUtility htu = new HBaseTestingUtility();
+    // Enable hedged reads and set it so the threshold is really low.
+    // Most of this test is taken from HDFS, from TestPread.
+    Configuration conf = htu.getConfiguration();
+    conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+    conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+    conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
+    conf.setBoolean("dfs.datanode.transferTo.allowed", false);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    // Get the metrics.  Should be empty.
+    DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
+    assertEquals(0, metrics.getHedgedReadOps());
+    FileSystem fileSys = cluster.getFileSystem();
+    try {
+      Path p = new Path("preadtest.dat");
+      // We need > 1 blocks to test out the hedged reads.
+      DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
+        blockSize, (short) 3, seed);
+      pReadFile(fileSys, p);
+      cleanupFile(fileSys, p);
+      assertTrue(metrics.getHedgedReadOps() > 0);
+    } finally {
+      fileSys.close();
+      cluster.shutdown();
+    }
+  }
+
+  // Below is taken from TestPread over in HDFS.
+  static final int blockSize = 4096;
+  static final long seed = 0xDEADBEEFL;
+
+  private void pReadFile(FileSystem fileSys, Path name) throws IOException {
+    FSDataInputStream stm = fileSys.open(name);
+    byte[] expected = new byte[12 * blockSize];
+    Random rand = new Random(seed);
+    rand.nextBytes(expected);
+    // do a sanity check. Read first 4K bytes
+    byte[] actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+    // now do a pread for the first 8K bytes
+    actual = new byte[8192];
+    doPread(stm, 0L, actual, 0, 8192);
+    checkAndEraseData(actual, 0, expected, "Pread Test 1");
+    // Now check to see if the normal read returns 4K - 8K byte range
+    actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 4096, expected, "Pread Test 2");
+    // Now see if we can cross a single block boundary successfully
+    // read 4K bytes from blockSize - 2K offset
+    stm.readFully(blockSize - 2048, actual, 0, 4096);
+    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
+    // now see if we can cross two block boundaries successfully
+    // read blockSize + 4K bytes from blockSize - 2K offset
+    actual = new byte[blockSize + 4096];
+    stm.readFully(blockSize - 2048, actual);
+    checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
+    // now see if we can cross two block boundaries that are not cached
+    // read blockSize + 4K bytes from 10 * blockSize - 2K offset
+    actual = new byte[blockSize + 4096];
+    stm.readFully(10 * blockSize - 2048, actual);
+    checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
+    // now check that even after all these preads, we can still read
+    // bytes 8K - 12K
+    actual = new byte[4096];
+    stm.readFully(actual);
+    checkAndEraseData(actual, 8192, expected, "Pread Test 6");
+    // done
+    stm.close();
+    // check block location caching
+    stm = fileSys.open(name);
+    stm.readFully(1, actual, 0, 4096);
+    stm.readFully(4 * blockSize, actual, 0, 4096);
+    stm.readFully(7 * blockSize, actual, 0, 4096);
+    actual = new byte[3 * 4096];
+    stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
+    checkAndEraseData(actual, 0, expected, "Pread Test 7");
+    actual = new byte[8 * 4096];
+    stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
+    checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
+    // read the tail
+    stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
+    IOException res = null;
+    try { // read beyond the end of the file
+      stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
+    } catch (IOException e) {
+      // should throw an exception
+      res = e;
+    }
+    assertTrue("Error reading beyond file boundary.", res != null);
+
+    stm.close();
+  }
+
+  private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
+    for (int idx = 0; idx < actual.length; idx++) {
+      assertEquals(message + " byte " + (from + idx) + " differs. expected " +
+          expected[from + idx] + " actual " + actual[idx],
+        actual[idx], expected[from + idx]);
+      actual[idx] = 0;
+    }
+  }
+
+  private void doPread(FSDataInputStream stm, long position, byte[] buffer,
+    int offset, int length) throws IOException {
+    int nread = 0;
+
+    while (nread < length) {
+      int nbytes =
+        stm.read(position + nread, buffer, offset + nread, length - nread);
+      assertTrue("Error in pread", nbytes > 0);
+      nread += nbytes;
+    }
+  }
 }