You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2018/05/05 01:38:54 UTC

[2/6] hbase git commit: HBASE-20513 Collect and emit ScanMetrics in PerformanceEvaluation

HBASE-20513 Collect and emit ScanMetrics in PerformanceEvaluation


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/81f69e58
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/81f69e58
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/81f69e58

Branch: refs/heads/branch-2.0
Commit: 81f69e585159840b622b9030e4eb6ebf35a7e6ae
Parents: 3c4fada
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue May 1 10:58:09 2018 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri May 4 17:59:16 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/PerformanceEvaluation.java     | 119 ++++++++++++++++---
 1 file changed, 100 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/81f69e58/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
----------------------------------------------------------------------
diff --git a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
index a0d5572..11905aa 100644
--- a/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
+++ b/hbase-mapreduce/src/test/java/org/apache/hadoop/hbase/PerformanceEvaluation.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterAllFilter;
@@ -1046,6 +1047,12 @@ public class PerformanceEvaluation extends Configured implements Tool {
     private String testName;
     private Histogram latencyHistogram;
     private Histogram valueSizeHistogram;
+    private Histogram rpcCallsHistogram;
+    private Histogram remoteRpcCallsHistogram;
+    private Histogram millisBetweenNextHistogram;
+    private Histogram regionsScannedHistogram;
+    private Histogram bytesInResultsHistogram;
+    private Histogram bytesInRemoteResultsHistogram;
     private RandomDistribution.Zipf zipf;
 
     /**
@@ -1102,6 +1109,34 @@ public class PerformanceEvaluation extends Configured implements Tool {
       this.valueSizeHistogram.update(valueSize);
     }
 
+    void updateScanMetrics(final ScanMetrics metrics) {
+      Map<String,Long> metricsMap = metrics.getMetricsMap();
+      Long rpcCalls = metricsMap.get(ScanMetrics.RPC_CALLS_METRIC_NAME);
+      if (rpcCalls != null) {
+        this.rpcCallsHistogram.update(rpcCalls.longValue());
+      }
+      Long remoteRpcCalls = metricsMap.get(ScanMetrics.REMOTE_RPC_CALLS_METRIC_NAME);
+      if (remoteRpcCalls != null) {
+        this.remoteRpcCallsHistogram.update(remoteRpcCalls.longValue());
+      }
+      Long millisBetweenNext = metricsMap.get(ScanMetrics.MILLIS_BETWEEN_NEXTS_METRIC_NAME);
+      if (millisBetweenNext != null) {
+        this.millisBetweenNextHistogram.update(millisBetweenNext.longValue());
+      }
+      Long regionsScanned = metricsMap.get(ScanMetrics.REGIONS_SCANNED_METRIC_NAME);
+      if (regionsScanned != null) {
+        this.regionsScannedHistogram.update(regionsScanned.longValue());
+      }
+      Long bytesInResults = metricsMap.get(ScanMetrics.BYTES_IN_RESULTS_METRIC_NAME);
+      if (bytesInResults != null && bytesInResults.longValue() > 0) {
+        this.bytesInResultsHistogram.update(bytesInResults.longValue());
+      }
+      Long bytesInRemoteResults = metricsMap.get(ScanMetrics.BYTES_IN_REMOTE_RESULTS_METRIC_NAME);
+      if (bytesInRemoteResults != null && bytesInRemoteResults.longValue() > 0) {
+        this.bytesInRemoteResultsHistogram.update(bytesInRemoteResults.longValue());
+      }
+    }
+
     String generateStatus(final int sr, final int i, final int lr) {
       return sr + "/" + i + "/" + lr + ", latency " + getShortLatencyReport() +
         (!isRandomValueSize()? "": ", value size " + getShortValueSizeReport());
@@ -1123,10 +1158,19 @@ public class PerformanceEvaluation extends Configured implements Tool {
     }
 
     void testSetup() throws IOException {
-      createConnection();
-      onStartup();
+      // test metrics
       latencyHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
       valueSizeHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      // scan metrics
+      rpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      remoteRpcCallsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      millisBetweenNextHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      regionsScannedHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      bytesInResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+      bytesInRemoteResultsHistogram = YammerHistogramUtils.newHistogram(new UniformReservoir(1024 * 500));
+
+      createConnection();
+      onStartup();
     }
 
     abstract void createConnection() throws IOException;
@@ -1148,6 +1192,30 @@ public class PerformanceEvaluation extends Configured implements Tool {
             + YammerHistogramUtils.getHistogramReport(valueSizeHistogram));
         status.setStatus("Num measures (ValueSize): " + valueSizeHistogram.getCount());
         status.setStatus(YammerHistogramUtils.getPrettyHistogramReport(valueSizeHistogram));
+        if (rpcCallsHistogram.getCount() > 0) {
+          status.setStatus("rpcCalls (count): " +
+              YammerHistogramUtils.getHistogramReport(rpcCallsHistogram));
+        }
+        if (remoteRpcCallsHistogram.getCount() > 0) {
+          status.setStatus("remoteRpcCalls (count): " +
+              YammerHistogramUtils.getHistogramReport(remoteRpcCallsHistogram));
+        }
+        if (millisBetweenNextHistogram.getCount() > 0) {
+          status.setStatus("millisBetweenNext (latency): " +
+              YammerHistogramUtils.getHistogramReport(millisBetweenNextHistogram));
+        }
+        if (regionsScannedHistogram.getCount() > 0) {
+          status.setStatus("regionsScanned (count): " +
+              YammerHistogramUtils.getHistogramReport(regionsScannedHistogram));
+        }
+        if (bytesInResultsHistogram.getCount() > 0) {
+          status.setStatus("bytesInResults (size): " +
+              YammerHistogramUtils.getHistogramReport(bytesInResultsHistogram));
+        }
+        if (bytesInRemoteResultsHistogram.getCount() > 0) {
+          status.setStatus("bytesInRemoteResults (size): " +
+              YammerHistogramUtils.getHistogramReport(bytesInRemoteResultsHistogram));
+        }
       }
       closeConnection();
       receiverHost.closeReceivers();
@@ -1455,6 +1523,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
     @Override
     void testTakedown() throws IOException {
       if (this.testScanner != null) {
+        updateScanMetrics(this.testScanner.getScanMetrics());
         this.testScanner.close();
       }
       super.testTakedown();
@@ -1466,7 +1535,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
         Scan scan =
             new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
                 .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
-                .setReadType(opts.scanReadType);
+                .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
         if (opts.addColumns) {
           for (int column = 0; column < opts.columns; column++) {
             byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
@@ -1577,7 +1646,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
     void testRow(final int i) throws IOException {
       Scan scan = new Scan().withStartRow(getRandomRow(this.rand, opts.totalRows))
           .setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
-          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
+          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
+          .setScanMetricsEnabled(true);
       FilterList list = new FilterList();
       if (opts.addColumns) {
         for (int column = 0; column < opts.columns; column++) {
@@ -1593,10 +1663,14 @@ public class PerformanceEvaluation extends Configured implements Tool {
       list.addFilter(new WhileMatchFilter(new PageFilter(120)));
       scan.setFilter(list);
       ResultScanner s = this.table.getScanner(scan);
-      for (Result rr; (rr = s.next()) != null;) {
-        updateValueSize(rr);
+      try {
+        for (Result rr; (rr = s.next()) != null;) {
+          updateValueSize(rr);
+        }
+      } finally {
+        updateScanMetrics(s.getScanMetrics());
+        s.close();
       }
-      s.close();
     }
 
     @Override
@@ -1618,7 +1692,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Scan scan = new Scan().withStartRow(startAndStopRow.getFirst())
           .withStopRow(startAndStopRow.getSecond()).setCaching(opts.caching)
           .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
-          .setReadType(opts.scanReadType);
+          .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
       if (opts.filterAll) {
         scan.setFilter(new FilterAllFilter());
       }
@@ -1633,17 +1707,20 @@ public class PerformanceEvaluation extends Configured implements Tool {
       Result r = null;
       int count = 0;
       ResultScanner s = this.table.getScanner(scan);
-      for (; (r = s.next()) != null;) {
-        updateValueSize(r);
-        count++;
-      }
-      if (i % 100 == 0) {
-        LOG.info(String.format("Scan for key range %s - %s returned %s rows",
+      try {
+        for (; (r = s.next()) != null;) {
+          updateValueSize(r);
+          count++;
+        }
+        if (i % 100 == 0) {
+          LOG.info(String.format("Scan for key range %s - %s returned %s rows",
             Bytes.toString(startAndStopRow.getFirst()),
             Bytes.toString(startAndStopRow.getSecond()), count));
+        }
+      } finally {
+        updateScanMetrics(s.getScanMetrics());
+        s.close();
       }
-
-      s.close();
     }
 
     protected abstract Pair<byte[],byte[]> getStartAndStopRow();
@@ -1824,7 +1901,7 @@ public class PerformanceEvaluation extends Configured implements Tool {
       if (this.testScanner == null) {
         Scan scan = new Scan().withStartRow(format(opts.startRow)).setCaching(opts.caching)
             .setCacheBlocks(opts.cacheBlocks).setAsyncPrefetch(opts.asyncPrefetch)
-            .setReadType(opts.scanReadType);
+            .setReadType(opts.scanReadType).setScanMetricsEnabled(true);
         if (opts.addColumns) {
           for (int column = 0; column < opts.columns; column++) {
             byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);
@@ -2031,7 +2108,10 @@ public class PerformanceEvaluation extends Configured implements Tool {
           updateValueSize(r);
         }
       } finally {
-        if (scanner != null) scanner.close();
+        if (scanner != null) {
+          updateScanMetrics(scanner.getScanMetrics());
+          scanner.close();
+        }
       }
     }
 
@@ -2046,7 +2126,8 @@ public class PerformanceEvaluation extends Configured implements Tool {
         list.addFilter(new FilterAllFilter());
       }
       Scan scan = new Scan().setCaching(opts.caching).setCacheBlocks(opts.cacheBlocks)
-          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType);
+          .setAsyncPrefetch(opts.asyncPrefetch).setReadType(opts.scanReadType)
+          .setScanMetricsEnabled(true);
       if (opts.addColumns) {
         for (int column = 0; column < opts.columns; column++) {
           byte [] qualifier = column == 0? COLUMN_ZERO: Bytes.toBytes("" + column);