You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/11/12 07:15:30 UTC

hbase git commit: HBASE-19035 Miss metrics when coprocessor use region scanner to read data

Repository: hbase
Updated Branches:
  refs/heads/master c87189d41 -> 1ba7cc216


HBASE-19035 Miss metrics when coprocessor use region scanner to read data


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1ba7cc21
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1ba7cc21
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1ba7cc21

Branch: refs/heads/master
Commit: 1ba7cc216470ac16cd439aa0b35d8e6f9d6c7d8d
Parents: c87189d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Nov 12 10:32:58 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sun Nov 12 15:11:43 2017 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  17 +--
 .../MetricsRegionServerWrapperImpl.java         |   2 +-
 .../hbase/regionserver/RSRpcServices.java       |  10 --
 .../TestRegionServerReadRequestMetrics.java     | 141 +++++++++++++++----
 4 files changed, 125 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a265a55..197aa3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -292,8 +292,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   final LongAdder checkAndMutateChecksFailed = new LongAdder();
 
   // Number of requests
+  // Count rows for scan
   final LongAdder readRequestsCount = new LongAdder();
   final LongAdder filteredReadRequestsCount = new LongAdder();
+  // Count rows for multi row mutations
   final LongAdder writeRequestsCount = new LongAdder();
 
   // Number of requests blocked by memstore size.
@@ -1229,14 +1231,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
     return readRequestsCount.sum();
   }
 
-  /**
-   * Update the read request count for this region
-   * @param i increment
-   */
-  public void updateReadRequestsCount(long i) {
-    readRequestsCount.add(i);
-  }
-
   @Override
   public long getFilteredReadRequestsCount() {
     return filteredReadRequestsCount.sum();
@@ -6212,7 +6206,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             "or a lengthy garbage collection");
       }
       startRegionOperation(Operation.SCAN);
-      readRequestsCount.increment();
       try {
         return nextRaw(outResults, scannerContext);
       } finally {
@@ -6244,6 +6237,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         outResults.addAll(tmpList);
       }
 
+      if (!outResults.isEmpty()) {
+        readRequestsCount.increment();
+      }
+
       // If the size limit was reached it means a partial Result is being returned. Returning a
       // partial Result means that we should not reset the filters; filters should only be reset in
       // between rows
@@ -7165,7 +7162,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
   @Override
   public void mutateRowsWithLocks(Collection<Mutation> mutations,
       Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
-    writeRequestsCount.add(mutations.size());
     MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
     processRowsWithLocks(proc, -1, nonceGroup, nonce);
   }
@@ -7259,6 +7255,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         // STEP 4. Let the processor scan the rows, generate mutations and add waledits
         doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
         if (!mutations.isEmpty()) {
+          writeRequestsCount.add(mutations.size());
           // STEP 5. Call the preBatchMutate hook
           processor.preBatchMutate(this, walEdit);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 515b1ea..7d7833b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -228,7 +228,7 @@ class MetricsRegionServerWrapperImpl
 
   @Override
   public long getTotalRowActionRequestCount() {
-    return regionServer.rpcServices.requestRowActionCount.sum();
+    return readRequestsCount + writeRequestsCount;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 4b3fa50..68e7049 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -264,10 +264,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
   // Count only once for requests with multiple actions like multi/caching-scan/replayBatch
   final LongAdder requestCount = new LongAdder();
 
-  // Request counter. (Excludes requests that are not serviced by regions.)
-  // Count rows for requests with multiple actions like multi/caching-scan/replayBatch
-  final LongAdder requestRowActionCount = new LongAdder();
-
   // Request counter for rpc get
   final LongAdder rpcGetRequestCount = new LongAdder();
 
@@ -1104,7 +1100,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
         }
       }
       requestCount.increment();
-      requestRowActionCount.add(mutations.size());
       if (!region.getRegionInfo().isMetaRegion()) {
         regionServer.cacheFlusher.reclaimMemStoreMemory();
       }
@@ -2380,7 +2375,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      requestRowActionCount.increment();
       rpcGetRequestCount.increment();
       HRegion region = getRegion(request.getRegion());
 
@@ -2549,7 +2543,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
       .getRegionActionCount());
     ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
     for (RegionAction regionAction : request.getRegionActionList()) {
-      this.requestRowActionCount.add(regionAction.getActionCount());
       OperationQuota quota;
       HRegion region;
       regionActionResultBuilder.clear();
@@ -2684,7 +2677,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     try {
       checkOpen();
       requestCount.increment();
-      requestRowActionCount.increment();
       rpcMutateRequestCount.increment();
       HRegion region = getRegion(request.getRegion());
       MutateResponse.Builder builder = MutateResponse.newBuilder();
@@ -3130,8 +3122,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
           builder.setScanMetrics(metricBuilder.build());
         }
       }
-      region.updateReadRequestsCount(numOfResults);
-      requestRowActionCount.add(numOfResults);
       long end = EnvironmentEdgeManager.currentTime();
       long responseCellSize = context != null ? context.getResponseCellSize() : 0;
       region.getMetrics().updateScanTime(end - before);

http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
index 7822bc7..7227183 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
@@ -18,26 +18,33 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.ClusterStatus.Option;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.RegionLoad;
 import org.apache.hadoop.hbase.ServerLoad;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Increment;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
 import org.apache.hadoop.hbase.filter.BinaryComparator;
 import org.apache.hadoop.hbase.filter.CompareFilter;
 import org.apache.hadoop.hbase.filter.RowFilter;
@@ -54,13 +61,17 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 @Category(MediumTests.class)
 public class TestRegionServerReadRequestMetrics {
+  private static final Log LOG = LogFactory.getLog(TestRegionServerReadRequestMetrics.class);
   private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
   private static final TableName TABLE_NAME = TableName.valueOf("test");
   private static final byte[] CF1 = "c1".getBytes();
@@ -83,7 +94,7 @@ public class TestRegionServerReadRequestMetrics {
   private static Admin admin;
   private static Collection<ServerName> serverNames;
   private static Table table;
-  private static List<HRegionInfo> tableRegions;
+  private static RegionInfo regionInfo;
 
   private static Map<Metric, Long> requestsMap = new HashMap<>();
   private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
@@ -98,7 +109,9 @@ public class TestRegionServerReadRequestMetrics {
     serverNames = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
     table = createTable();
     putData();
-    tableRegions = admin.getTableRegions(TABLE_NAME);
+    List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
+    assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
+    regionInfo = regions.get(0);
 
     for (Metric metric : Metric.values()) {
       requestsMap.put(metric, 0L);
@@ -107,14 +120,11 @@ public class TestRegionServerReadRequestMetrics {
   }
 
   private static Table createTable() throws IOException {
-    HTableDescriptor td = new HTableDescriptor(TABLE_NAME);
-    HColumnDescriptor cd1 = new HColumnDescriptor(CF1);
-    td.addFamily(cd1);
-    HColumnDescriptor cd2 = new HColumnDescriptor(CF2);
-    cd2.setTimeToLive(TTL);
-    td.addFamily(cd2);
-
-    admin.createTable(td);
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL)
+        .build());
+    admin.createTable(builder.build());
     return TEST_UTIL.getConnection().getTable(TABLE_NAME);
   }
 
@@ -159,18 +169,16 @@ public class TestRegionServerReadRequestMetrics {
         serverLoad = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
 
         Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
-        for (HRegionInfo tableRegion : tableRegions) {
-          RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName());
-          if (regionLoad != null) {
-            regionLoadOuter = regionLoad;
-            for (Metric metric : Metric.values()) {
-              if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
-                for (Metric metricInner : Metric.values()) {
-                  requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
-                }
-                metricsUpdated = true;
-                break;
+        RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
+        if (regionLoad != null) {
+          regionLoadOuter = regionLoad;
+          for (Metric metric : Metric.values()) {
+            if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
+              for (Metric metricInner : Metric.values()) {
+                requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
               }
+              metricsUpdated = true;
+              break;
             }
           }
         }
@@ -397,5 +405,90 @@ public class TestRegionServerReadRequestMetrics {
     }
   }
 
+  @Test
+  public void testReadRequestsWithCoprocessor() throws Exception {
+    TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+    builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
+    builder.addCoprocessor(ScanRegionCoprocessor.class.getName());
+    admin.createTable(builder.build());
+
+    try {
+      TEST_UTIL.waitTableAvailable(tableName);
+      List<RegionInfo> regionInfos = admin.getRegions(tableName);
+      assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
+      boolean success = true;
+      int i = 0;
+      for (; i < MAX_TRY; i++) {
+        try {
+          testReadRequests(regionInfos.get(0).getRegionName(), 3);
+        } catch (Throwable t) {
+          LOG.warn("Got exception when try " + i + " times", t);
+          Thread.sleep(SLEEP_MS);
+          success = false;
+        }
+        if (success) {
+          break;
+        }
+      }
+      if (i == MAX_TRY) {
+        fail("Failed to get right read requests metric after try " + i + " times");
+      }
+    } finally {
+      admin.disableTable(tableName);
+      admin.deleteTable(tableName);
+    }
+  }
+
+  private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
+    for (ServerName serverName : serverNames) {
+      ServerLoad serverLoad =
+          admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
+      Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
+      RegionLoad regionLoad = regionsLoad.get(regionName);
+      if (regionLoad != null) {
+        LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
+            + ", region read request is " + regionLoad.getReadRequestsCount());
+        assertEquals(3, serverLoad.getReadRequestsCount());
+        assertEquals(3, regionLoad.getReadRequestsCount());
+      }
+    }
+  }
+
+  public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
+      RegionCoprocessorEnvironment env = c.getEnvironment();
+      Region region = env.getRegion();
+      try {
+        putData(region);
+        RegionScanner scanner = region.getScanner(new Scan());
+        List<Cell> result = new LinkedList<>();
+        while (scanner.next(result)) {
+          result.clear();
+        }
+      } catch (Exception e) {
+        LOG.warn("Got exception in coprocessor", e);
+      }
+    }
+
+    private void putData(Region region) throws Exception {
+      Put put = new Put(ROW1);
+      put.addColumn(CF1, COL1, VAL1);
+      region.put(put);
+      put = new Put(ROW2);
+      put.addColumn(CF1, COL1, VAL1);
+      region.put(put);
+      put = new Put(ROW3);
+      put.addColumn(CF1, COL1, VAL1);
+      region.put(put);
+    }
+  }
+
   private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
 }