You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by re...@apache.org on 2020/05/28 02:11:22 UTC
[hbase] branch branch-1 updated: HBASE-24435 Add hedgedReads and
hedgedReadWins count metrics (#1781)
This is an automated email from the ASF dual-hosted git repository.
reidchan pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new 6aa2286 HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)
6aa2286 is described below
commit 6aa2286733f8ca224c03c1a61c17077fa29410a3
Author: Javier Akira Luca de Tena <ak...@gmail.com>
AuthorDate: Thu May 28 11:11:10 2020 +0900
HBASE-24435 Add hedgedReads and hedgedReadWins count metrics (#1781)
Co-authored-by: stack <st...@apache.org>
Co-authored-by: Javier <ja...@linecorp.com>
Signed-off-by: Reid Chan <re...@apache.org>
---
.../regionserver/MetricsRegionServerSource.java | 6 +
.../regionserver/MetricsRegionServerWrapper.java | 10 ++
.../MetricsRegionServerSourceImpl.java | 5 +
.../MetricsRegionServerWrapperImpl.java | 23 ++++
.../java/org/apache/hadoop/hbase/util/FSUtils.java | 45 ++++++++
.../MetricsRegionServerWrapperStub.java | 10 ++
.../org/apache/hadoop/hbase/util/TestFSUtils.java | 127 +++++++++++++++++++++
7 files changed, 226 insertions(+)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
index e006259..219bb69 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSource.java
@@ -504,6 +504,12 @@ public interface MetricsRegionServerSource extends BaseSource, JvmPauseMonitorSo
String MAJOR_COMPACTED_OUTPUT_BYTES_DESC
= "Total number of bytes that is output from compaction, major only";
+ String HEDGED_READS = "hedgedReads";
+ String HEDGED_READS_DESC = "The number of times we started a hedged read";
+ String HEDGED_READ_WINS = "hedgedReadWins";
+ String HEDGED_READ_WINS_DESC =
+ "The number of times we started a hedged read and a hedged read won";
+
String RPC_GET_REQUEST_COUNT = "rpcGetRequestCount";
String RPC_GET_REQUEST_COUNT_DESC = "Number of rpc get requests this region server has answered.";
String RPC_SCAN_REQUEST_COUNT = "rpcScanRequestCount";
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
index ec84acc..c9e1545 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapper.java
@@ -349,6 +349,16 @@ public interface MetricsRegionServerWrapper {
long getMajorCompactedCellsSize();
/**
+ * @return Count of hedged read operations
+ */
+ long getHedgedReadOps();
+
+ /**
+ * @return Count of times a hedged read beat out the primary read.
+ */
+ long getHedgedReadWins();
+
+ /**
* @return Number of total bytes read from HDFS.
*/
long getTotalBytesRead();
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
index 31b5a97..24ab167 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceImpl.java
@@ -504,6 +504,11 @@ public class MetricsRegionServerSourceImpl
rsWrap.getCompactedCellsSize())
.addCounter(Interns.info(MAJOR_COMPACTED_CELLS_SIZE, MAJOR_COMPACTED_CELLS_SIZE_DESC),
rsWrap.getMajorCompactedCellsSize())
+
+ .addCounter(Interns.info(HEDGED_READS, HEDGED_READS_DESC), rsWrap.getHedgedReadOps())
+ .addCounter(Interns.info(HEDGED_READ_WINS, HEDGED_READ_WINS_DESC),
+ rsWrap.getHedgedReadWins())
+
.addCounter(Interns.info(BLOCKED_REQUESTS_COUNT, BLOCKED_REQUESTS_COUNT_DESC),
rsWrap.getBlockedRequestsCount())
.tag(Interns.info(ZOOKEEPER_QUORUM_NAME, ZOOKEEPER_QUORUM_DESC),
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 0d20c7d..6817e04 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -40,7 +41,9 @@ import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.CacheStats;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.metrics2.MetricsExecutor;
/**
@@ -98,6 +101,11 @@ class MetricsRegionServerWrapperImpl
private Runnable runnable;
private long period;
+ /**
+ * Can be null if not on hdfs.
+ */
+ private DFSHedgedReadMetrics dfsHedgedReadMetrics;
+
public MetricsRegionServerWrapperImpl(final HRegionServer regionServer) {
this.regionServer = regionServer;
initBlockCache();
@@ -111,6 +119,11 @@ class MetricsRegionServerWrapperImpl
this.executor.scheduleWithFixedDelay(this.runnable, this.period, this.period,
TimeUnit.MILLISECONDS);
+ try {
+ this.dfsHedgedReadMetrics = FSUtils.getDFSHedgedReadMetrics(regionServer.getConfiguration());
+ } catch (IOException e) {
+ LOG.warn("Failed to get hedged metrics", e);
+ }
if (LOG.isInfoEnabled()) {
LOG.info("Computing regionserver metrics every " + this.period + " milliseconds");
}
@@ -820,6 +833,16 @@ class MetricsRegionServerWrapperImpl
}
@Override
+ public long getHedgedReadOps() {
+ return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadOps();
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return this.dfsHedgedReadMetrics == null ? 0 : this.dfsHedgedReadMetrics.getHedgedReadWins();
+ }
+
+ @Override
public long getBlockedRequestsCount() {
return blockedRequestsCount;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index 3a04b4c..6ea7bea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.FSProtos;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
@@ -1780,4 +1782,47 @@ public abstract class FSUtils extends CommonFSUtils {
int hbaseSize = conf.getInt("hbase." + dfsKey, defaultSize);
conf.setIfUnset(dfsKey, Integer.toString(hbaseSize));
}
+
+ /**
+ * @return The DFSClient DFSHedgedReadMetrics instance or null if can't be found or not on hdfs.
+ */
+ public static DFSHedgedReadMetrics getDFSHedgedReadMetrics(final Configuration c)
+ throws IOException {
+ if (!isHDFS(c)) {
+ return null;
+ }
+ // getHedgedReadMetrics is package private. Get the DFSClient instance that is internal
+ // to the DFS FS instance and make the method getHedgedReadMetrics accessible, then invoke it
+ // to get the singleton instance of DFSHedgedReadMetrics shared by DFSClients.
+ final String name = "getHedgedReadMetrics";
+ DFSClient dfsclient = ((DistributedFileSystem)FileSystem.get(c)).getClient();
+ Method m;
+ try {
+ m = dfsclient.getClass().getDeclaredMethod(name);
+ } catch (NoSuchMethodException e) {
+ LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
+ e.getMessage());
+ return null;
+ } catch (SecurityException e) {
+ LOG.warn("Failed find method " + name + " in dfsclient; no hedged read metrics: " +
+ e.getMessage());
+ return null;
+ }
+ m.setAccessible(true);
+ try {
+ return (DFSHedgedReadMetrics)m.invoke(dfsclient);
+ } catch (IllegalAccessException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+ e.getMessage());
+ return null;
+ } catch (IllegalArgumentException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+ e.getMessage());
+ return null;
+ } catch (InvocationTargetException e) {
+ LOG.warn("Failed invoking method " + name + " on dfsclient; no hedged read metrics: " +
+ e.getMessage());
+ return null;
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
index dbf4758..90c240e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperStub.java
@@ -361,6 +361,16 @@ public class MetricsRegionServerWrapperStub implements MetricsRegionServerWrappe
}
@Override
+ public long getHedgedReadOps() {
+ return 100;
+ }
+
+ @Override
+ public long getHedgedReadWins() {
+ return 10;
+ }
+
+ @Override
public long getTotalBytesRead() {
return 0;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
index 52027a9..43f5187 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestFSUtils.java
@@ -27,11 +27,13 @@ import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
+import java.util.Random;
import java.util.UUID;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -43,6 +45,9 @@ import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.fs.HFileSystem;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSHedgedReadMetrics;
+import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.Assert;
import org.junit.Before;
@@ -435,4 +440,126 @@ public class TestFSUtils {
}
}
+ /**
+ * Ugly test that ensures we can get at the hedged read counters in dfsclient.
+ * Does a bit of preading with hedged reads enabled using code taken from hdfs TestPread.
+ */
+ @Test public void testDFSHedgedReadMetrics() throws Exception {
+ HBaseTestingUtility htu = new HBaseTestingUtility();
+ // Enable hedged reads and set it so the threshold is really low.
+ // Most of this test is taken from HDFS, from TestPread.
+ Configuration conf = htu.getConfiguration();
+ conf.setInt(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THREADPOOL_SIZE, 5);
+ conf.setLong(DFSConfigKeys.DFS_DFSCLIENT_HEDGED_READ_THRESHOLD_MILLIS, 0);
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
+ conf.setLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, 4096);
+ // Set short retry timeouts so this test runs faster
+ conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 0);
+ conf.setBoolean("dfs.datanode.transferTo.allowed", false);
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+ // Get the metrics. Should be empty.
+ DFSHedgedReadMetrics metrics = FSUtils.getDFSHedgedReadMetrics(conf);
+ assertEquals(0, metrics.getHedgedReadOps());
+ FileSystem fileSys = cluster.getFileSystem();
+ try {
+ Path p = new Path("preadtest.dat");
+ // We need > 1 blocks to test out the hedged reads.
+ DFSTestUtil.createFile(fileSys, p, 12 * blockSize, 12 * blockSize,
+ blockSize, (short) 3, seed);
+ pReadFile(fileSys, p);
+ cleanupFile(fileSys, p);
+ assertTrue(metrics.getHedgedReadOps() > 0);
+ } finally {
+ fileSys.close();
+ cluster.shutdown();
+ }
+ }
+
+ // Below is taken from TestPread over in HDFS.
+ static final int blockSize = 4096;
+ static final long seed = 0xDEADBEEFL;
+
+ private void pReadFile(FileSystem fileSys, Path name) throws IOException {
+ FSDataInputStream stm = fileSys.open(name);
+ byte[] expected = new byte[12 * blockSize];
+ Random rand = new Random(seed);
+ rand.nextBytes(expected);
+ // do a sanity check. Read first 4K bytes
+ byte[] actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 0, expected, "Read Sanity Test");
+ // now do a pread for the first 8K bytes
+ actual = new byte[8192];
+ doPread(stm, 0L, actual, 0, 8192);
+ checkAndEraseData(actual, 0, expected, "Pread Test 1");
+ // Now check to see if the normal read returns 4K - 8K byte range
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 4096, expected, "Pread Test 2");
+ // Now see if we can cross a single block boundary successfully
+ // read 4K bytes from blockSize - 2K offset
+ stm.readFully(blockSize - 2048, actual, 0, 4096);
+ checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 3");
+ // now see if we can cross two block boundaries successfully
+ // read blockSize + 4K bytes from blockSize - 2K offset
+ actual = new byte[blockSize + 4096];
+ stm.readFully(blockSize - 2048, actual);
+ checkAndEraseData(actual, (blockSize - 2048), expected, "Pread Test 4");
+ // now see if we can cross two block boundaries that are not cached
+ // read blockSize + 4K bytes from 10 * blockSize - 2K offset
+ actual = new byte[blockSize + 4096];
+ stm.readFully(10 * blockSize - 2048, actual);
+ checkAndEraseData(actual, (10 * blockSize - 2048), expected, "Pread Test 5");
+ // now check that even after all these preads, we can still read
+ // bytes 8K - 12K
+ actual = new byte[4096];
+ stm.readFully(actual);
+ checkAndEraseData(actual, 8192, expected, "Pread Test 6");
+ // done
+ stm.close();
+ // check block location caching
+ stm = fileSys.open(name);
+ stm.readFully(1, actual, 0, 4096);
+ stm.readFully(4 * blockSize, actual, 0, 4096);
+ stm.readFully(7 * blockSize, actual, 0, 4096);
+ actual = new byte[3 * 4096];
+ stm.readFully(0 * blockSize, actual, 0, 3 * 4096);
+ checkAndEraseData(actual, 0, expected, "Pread Test 7");
+ actual = new byte[8 * 4096];
+ stm.readFully(3 * blockSize, actual, 0, 8 * 4096);
+ checkAndEraseData(actual, 3 * blockSize, expected, "Pread Test 8");
+ // read the tail
+ stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize / 2);
+ IOException res = null;
+ try { // read beyond the end of the file
+ stm.readFully(11 * blockSize + blockSize / 2, actual, 0, blockSize);
+ } catch (IOException e) {
+ // should throw an exception
+ res = e;
+ }
+ assertTrue("Error reading beyond file boundary.", res != null);
+
+ stm.close();
+ }
+
+ private void checkAndEraseData(byte[] actual, int from, byte[] expected, String message) {
+ for (int idx = 0; idx < actual.length; idx++) {
+ assertEquals(message + " byte " + (from + idx) + " differs. expected " +
+ expected[from + idx] + " actual " + actual[idx],
+ actual[idx], expected[from + idx]);
+ actual[idx] = 0;
+ }
+ }
+
+ private void doPread(FSDataInputStream stm, long position, byte[] buffer,
+ int offset, int length) throws IOException {
+ int nread = 0;
+
+ while (nread < length) {
+ int nbytes =
+ stm.read(position + nread, buffer, offset + nread, length - nread);
+ assertTrue("Error in pread", nbytes > 0);
+ nread += nbytes;
+ }
+ }
}