You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/11/12 07:15:30 UTC
hbase git commit: HBASE-19035 Miss metrics when coprocessor use
region scanner to read data
Repository: hbase
Updated Branches:
refs/heads/master c87189d41 -> 1ba7cc216
HBASE-19035 Miss metrics when coprocessor use region scanner to read data
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1ba7cc21
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1ba7cc21
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1ba7cc21
Branch: refs/heads/master
Commit: 1ba7cc216470ac16cd439aa0b35d8e6f9d6c7d8d
Parents: c87189d
Author: Guanghao Zhang <zg...@apache.org>
Authored: Sun Nov 12 10:32:58 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Sun Nov 12 15:11:43 2017 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 17 +--
.../MetricsRegionServerWrapperImpl.java | 2 +-
.../hbase/regionserver/RSRpcServices.java | 10 --
.../TestRegionServerReadRequestMetrics.java | 141 +++++++++++++++----
4 files changed, 125 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a265a55..197aa3c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -292,8 +292,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
final LongAdder checkAndMutateChecksFailed = new LongAdder();
// Number of requests
+ // Count rows for scan
final LongAdder readRequestsCount = new LongAdder();
final LongAdder filteredReadRequestsCount = new LongAdder();
+ // Count rows for multi row mutations
final LongAdder writeRequestsCount = new LongAdder();
// Number of requests blocked by memstore size.
@@ -1229,14 +1231,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return readRequestsCount.sum();
}
- /**
- * Update the read request count for this region
- * @param i increment
- */
- public void updateReadRequestsCount(long i) {
- readRequestsCount.add(i);
- }
-
@Override
public long getFilteredReadRequestsCount() {
return filteredReadRequestsCount.sum();
@@ -6212,7 +6206,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
"or a lengthy garbage collection");
}
startRegionOperation(Operation.SCAN);
- readRequestsCount.increment();
try {
return nextRaw(outResults, scannerContext);
} finally {
@@ -6244,6 +6237,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
outResults.addAll(tmpList);
}
+ if (!outResults.isEmpty()) {
+ readRequestsCount.increment();
+ }
+
// If the size limit was reached it means a partial Result is being returned. Returning a
// partial Result means that we should not reset the filters; filters should only be reset in
// between rows
@@ -7165,7 +7162,6 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
@Override
public void mutateRowsWithLocks(Collection<Mutation> mutations,
Collection<byte[]> rowsToLock, long nonceGroup, long nonce) throws IOException {
- writeRequestsCount.add(mutations.size());
MultiRowMutationProcessor proc = new MultiRowMutationProcessor(mutations, rowsToLock);
processRowsWithLocks(proc, -1, nonceGroup, nonce);
}
@@ -7259,6 +7255,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// STEP 4. Let the processor scan the rows, generate mutations and add waledits
doProcessRowWithTimeout(processor, now, this, mutations, walEdit, timeout);
if (!mutations.isEmpty()) {
+ writeRequestsCount.add(mutations.size());
// STEP 5. Call the preBatchMutate hook
processor.preBatchMutate(this, walEdit);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index 515b1ea..7d7833b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -228,7 +228,7 @@ class MetricsRegionServerWrapperImpl
@Override
public long getTotalRowActionRequestCount() {
- return regionServer.rpcServices.requestRowActionCount.sum();
+ return readRequestsCount + writeRequestsCount;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 4b3fa50..68e7049 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -264,10 +264,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// Count only once for requests with multiple actions like multi/caching-scan/replayBatch
final LongAdder requestCount = new LongAdder();
- // Request counter. (Excludes requests that are not serviced by regions.)
- // Count rows for requests with multiple actions like multi/caching-scan/replayBatch
- final LongAdder requestRowActionCount = new LongAdder();
-
// Request counter for rpc get
final LongAdder rpcGetRequestCount = new LongAdder();
@@ -1104,7 +1100,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
requestCount.increment();
- requestRowActionCount.add(mutations.size());
if (!region.getRegionInfo().isMetaRegion()) {
regionServer.cacheFlusher.reclaimMemStoreMemory();
}
@@ -2380,7 +2375,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
- requestRowActionCount.increment();
rpcGetRequestCount.increment();
HRegion region = getRegion(request.getRegion());
@@ -2549,7 +2543,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
.getRegionActionCount());
ActivePolicyEnforcement spaceQuotaEnforcement = getSpaceQuotaManager().getActiveEnforcements();
for (RegionAction regionAction : request.getRegionActionList()) {
- this.requestRowActionCount.add(regionAction.getActionCount());
OperationQuota quota;
HRegion region;
regionActionResultBuilder.clear();
@@ -2684,7 +2677,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
try {
checkOpen();
requestCount.increment();
- requestRowActionCount.increment();
rpcMutateRequestCount.increment();
HRegion region = getRegion(request.getRegion());
MutateResponse.Builder builder = MutateResponse.newBuilder();
@@ -3130,8 +3122,6 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
builder.setScanMetrics(metricBuilder.build());
}
}
- region.updateReadRequestsCount(numOfResults);
- requestRowActionCount.add(numOfResults);
long end = EnvironmentEdgeManager.currentTime();
long responseCellSize = context != null ? context.getResponseCellSize() : 0;
region.getMetrics().updateScanTime(end - before);
http://git-wip-us.apache.org/repos/asf/hbase/blob/1ba7cc21/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
index 7822bc7..7227183 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerReadRequestMetrics.java
@@ -18,26 +18,33 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ClusterStatus.Option;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLoad;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Append;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Increment;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.filter.BinaryComparator;
import org.apache.hadoop.hbase.filter.CompareFilter;
import org.apache.hadoop.hbase.filter.RowFilter;
@@ -54,13 +61,17 @@ import java.io.IOException;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
@Category(MediumTests.class)
public class TestRegionServerReadRequestMetrics {
+ private static final Log LOG = LogFactory.getLog(TestRegionServerReadRequestMetrics.class);
private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static final TableName TABLE_NAME = TableName.valueOf("test");
private static final byte[] CF1 = "c1".getBytes();
@@ -83,7 +94,7 @@ public class TestRegionServerReadRequestMetrics {
private static Admin admin;
private static Collection<ServerName> serverNames;
private static Table table;
- private static List<HRegionInfo> tableRegions;
+ private static RegionInfo regionInfo;
private static Map<Metric, Long> requestsMap = new HashMap<>();
private static Map<Metric, Long> requestsMapPrev = new HashMap<>();
@@ -98,7 +109,9 @@ public class TestRegionServerReadRequestMetrics {
serverNames = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getServers();
table = createTable();
putData();
- tableRegions = admin.getTableRegions(TABLE_NAME);
+ List<RegionInfo> regions = admin.getRegions(TABLE_NAME);
+ assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regions.size());
+ regionInfo = regions.get(0);
for (Metric metric : Metric.values()) {
requestsMap.put(metric, 0L);
@@ -107,14 +120,11 @@ public class TestRegionServerReadRequestMetrics {
}
private static Table createTable() throws IOException {
- HTableDescriptor td = new HTableDescriptor(TABLE_NAME);
- HColumnDescriptor cd1 = new HColumnDescriptor(CF1);
- td.addFamily(cd1);
- HColumnDescriptor cd2 = new HColumnDescriptor(CF2);
- cd2.setTimeToLive(TTL);
- td.addFamily(cd2);
-
- admin.createTable(td);
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(TABLE_NAME);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF2).setTimeToLive(TTL)
+ .build());
+ admin.createTable(builder.build());
return TEST_UTIL.getConnection().getTable(TABLE_NAME);
}
@@ -159,18 +169,16 @@ public class TestRegionServerReadRequestMetrics {
serverLoad = admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
- for (HRegionInfo tableRegion : tableRegions) {
- RegionLoad regionLoad = regionsLoad.get(tableRegion.getRegionName());
- if (regionLoad != null) {
- regionLoadOuter = regionLoad;
- for (Metric metric : Metric.values()) {
- if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
- for (Metric metricInner : Metric.values()) {
- requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
- }
- metricsUpdated = true;
- break;
+ RegionLoad regionLoad = regionsLoad.get(regionInfo.getRegionName());
+ if (regionLoad != null) {
+ regionLoadOuter = regionLoad;
+ for (Metric metric : Metric.values()) {
+ if (getReadRequest(serverLoad, regionLoad, metric) > requestsMapPrev.get(metric)) {
+ for (Metric metricInner : Metric.values()) {
+ requestsMap.put(metricInner, getReadRequest(serverLoad, regionLoad, metricInner));
}
+ metricsUpdated = true;
+ break;
}
}
}
@@ -397,5 +405,90 @@ public class TestRegionServerReadRequestMetrics {
}
}
+ @Test
+ public void testReadRequestsWithCoprocessor() throws Exception {
+ TableName tableName = TableName.valueOf("testReadRequestsWithCoprocessor");
+ TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName);
+ builder.addColumnFamily(ColumnFamilyDescriptorBuilder.of(CF1));
+ builder.addCoprocessor(ScanRegionCoprocessor.class.getName());
+ admin.createTable(builder.build());
+
+ try {
+ TEST_UTIL.waitTableAvailable(tableName);
+ List<RegionInfo> regionInfos = admin.getRegions(tableName);
+ assertEquals("Table " + TABLE_NAME + " should have 1 region", 1, regionInfos.size());
+ boolean success = true;
+ int i = 0;
+ for (; i < MAX_TRY; i++) {
+ try {
+ testReadRequests(regionInfos.get(0).getRegionName(), 3);
+ } catch (Throwable t) {
+ LOG.warn("Got exception when try " + i + " times", t);
+ Thread.sleep(SLEEP_MS);
+ success = false;
+ }
+ if (success) {
+ break;
+ }
+ }
+ if (i == MAX_TRY) {
+ fail("Failed to get right read requests metric after try " + i + " times");
+ }
+ } finally {
+ admin.disableTable(tableName);
+ admin.deleteTable(tableName);
+ }
+ }
+
+ private void testReadRequests(byte[] regionName, int expectedReadRequests) throws Exception {
+ for (ServerName serverName : serverNames) {
+ ServerLoad serverLoad =
+ admin.getClusterStatus(EnumSet.of(Option.LIVE_SERVERS)).getLoad(serverName);
+ Map<byte[], RegionLoad> regionsLoad = serverLoad.getRegionsLoad();
+ RegionLoad regionLoad = regionsLoad.get(regionName);
+ if (regionLoad != null) {
+ LOG.debug("server read request is " + serverLoad.getReadRequestsCount()
+ + ", region read request is " + regionLoad.getReadRequestsCount());
+ assertEquals(3, serverLoad.getReadRequestsCount());
+ assertEquals(3, regionLoad.getReadRequestsCount());
+ }
+ }
+ }
+
+ public static class ScanRegionCoprocessor implements RegionCoprocessor, RegionObserver {
+ @Override
+ public Optional<RegionObserver> getRegionObserver() {
+ return Optional.of(this);
+ }
+
+ @Override
+ public void postOpen(ObserverContext<RegionCoprocessorEnvironment> c) {
+ RegionCoprocessorEnvironment env = c.getEnvironment();
+ Region region = env.getRegion();
+ try {
+ putData(region);
+ RegionScanner scanner = region.getScanner(new Scan());
+ List<Cell> result = new LinkedList<>();
+ while (scanner.next(result)) {
+ result.clear();
+ }
+ } catch (Exception e) {
+ LOG.warn("Got exception in coprocessor", e);
+ }
+ }
+
+ private void putData(Region region) throws Exception {
+ Put put = new Put(ROW1);
+ put.addColumn(CF1, COL1, VAL1);
+ region.put(put);
+ put = new Put(ROW2);
+ put.addColumn(CF1, COL1, VAL1);
+ region.put(put);
+ put = new Put(ROW3);
+ put.addColumn(CF1, COL1, VAL1);
+ region.put(put);
+ }
+ }
+
private enum Metric {REGION_READ, SERVER_READ, FILTERED_REGION_READ, FILTERED_SERVER_READ}
}