You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2019/05/25 22:12:29 UTC
[hbase] 02/02: HBASE-21991: Fix MetaMetrics issues - [Race
condition, Faulty remove logic], few improvements
This is an automated email from the ASF dual-hosted git repository.
psomogyi pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit b9285b3a3fe51d3fea1c222ede4cd1e189d12feb
Author: Sakthi <sa...@gmail.com>
AuthorDate: Sat Mar 9 16:09:34 2019 -0800
HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements
Signed-off-by: Xu Cang <xu...@apache.org>
---
.../hadoop/hbase/coprocessor/MetaTableMetrics.java | 118 ++++++++++++---------
.../apache/hadoop/hbase/util/LossyCounting.java | 11 +-
.../hbase/coprocessor/TestMetaTableMetrics.java | 99 +++++++++++++++++
.../hadoop/hbase/util/TestLossyCounting.java | 10 +-
4 files changed, 177 insertions(+), 61 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index d08bae6..d542d2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -50,8 +50,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
private ExampleRegionObserverMeta observer;
private Map<String, Optional<Metric>> requestsMap;
- private RegionCoprocessorEnvironment regionCoprocessorEnv;
- private LossyCounting clientMetricsLossyCounting;
+ private MetricRegistry registry;
+ private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false;
enum MetaTableOps {
@@ -94,11 +94,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (!active || !isMetaTableOp(e)) {
return;
}
- tableMetricRegisterAndMark(e, row);
- clientMetricRegisterAndMark(e);
- regionMetricRegisterAndMark(e, row);
- opMetricRegisterAndMark(e, row);
- opWithClientMetricRegisterAndMark(e, row);
+ tableMetricRegisterAndMark(row);
+ clientMetricRegisterAndMark();
+ regionMetricRegisterAndMark(row);
+ opMetricRegisterAndMark(row);
+ opWithClientMetricRegisterAndMark(row);
}
private void markMeterIfPresent(String requestMeter) {
@@ -106,19 +106,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
return;
}
- if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) {
- Meter metric = (Meter) requestsMap.get(requestMeter).get();
+ Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
+ if (optionalMetric != null && optionalMetric.isPresent()) {
+ Meter metric = (Meter) optionalMetric.get();
metric.mark();
}
}
- private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
- String requestMeter) {
+ private void registerMeterIfNotPresent(String requestMeter) {
if (requestMeter.isEmpty()) {
return;
}
if (!requestsMap.containsKey(requestMeter)) {
- MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
registry.meter(requestMeter);
requestsMap.put(requestMeter, registry.get(requestMeter));
}
@@ -129,32 +128,36 @@ public class MetaTableMetrics implements RegionCoprocessor {
* By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
* e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
* also, all kept elements have frequency higher than e * N. (N is total count)
- * @param e Region coprocessor environment
* @param requestMeter meter to be registered
* @param lossyCounting lossyCounting object for one type of meters.
*/
- private void registerLossyCountingMeterIfNotPresent(
- ObserverContext<RegionCoprocessorEnvironment> e,
- String requestMeter, LossyCounting lossyCounting) {
+ private void registerLossyCountingMeterIfNotPresent(String requestMeter,
+ LossyCounting lossyCounting) {
+
if (requestMeter.isEmpty()) {
return;
}
- Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
- if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){
- for(String meter: metersToBeRemoved) {
- //cleanup requestsMap according swept data from lossy count;
+ synchronized (lossyCounting) {
+ Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
+
+ boolean isNewMeter = !requestsMap.containsKey(requestMeter);
+ boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
+ if (isNewMeter) {
+ if (requestMeterRemoved) {
+ // if the new metric is swept off by lossyCounting then don't add in the map
+ metersToBeRemoved.remove(requestMeter);
+ } else {
+ // else register the new metric and add in the map
+ registry.meter(requestMeter);
+ requestsMap.put(requestMeter, registry.get(requestMeter));
+ }
+ }
+
+ for (String meter : metersToBeRemoved) {
+ //cleanup requestsMap according to the swept data from lossy count;
requestsMap.remove(meter);
- MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
registry.remove(meter);
}
- // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
- return;
- }
-
- if (!requestsMap.containsKey(requestMeter)) {
- MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
- registry.meter(requestMeter);
- requestsMap.put(requestMeter, registry.get(requestMeter));
}
}
@@ -191,49 +194,59 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(e.getEnvironment().getRegionInfo().getTable());
}
- private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
+ private void clientMetricRegisterAndMark() {
// Mark client metric
String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
-
+ if (clientIP == null || clientIP.isEmpty()) {
+ return;
+ }
String clientRequestMeter = clientRequestMeterName(clientIP);
- registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting);
+ registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
markMeterIfPresent(clientRequestMeter);
}
- private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
- Row op) {
+ private void tableMetricRegisterAndMark(Row op) {
// Mark table metric
String tableName = getTableNameFromOp(op);
+ if (tableName == null || tableName.isEmpty()) {
+ return;
+ }
String tableRequestMeter = tableMeterName(tableName);
- registerAndMarkMeterIfNotPresent(e, tableRequestMeter);
+ registerAndMarkMeterIfNotPresent(tableRequestMeter);
}
- private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
- Row op) {
+ private void regionMetricRegisterAndMark(Row op) {
// Mark region metric
String regionId = getRegionIdFromOp(op);
+ if (regionId == null || regionId.isEmpty()) {
+ return;
+ }
String regionRequestMeter = regionMeterName(regionId);
- registerAndMarkMeterIfNotPresent(e, regionRequestMeter);
+ registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
+ markMeterIfPresent(regionRequestMeter);
}
- private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
- Row op) {
+ private void opMetricRegisterAndMark(Row op) {
// Mark access type ["get", "put", "delete"] metric
String opMeterName = opMeterName(op);
- registerAndMarkMeterIfNotPresent(e, opMeterName);
+ if (opMeterName == null || opMeterName.isEmpty()) {
+ return;
+ }
+ registerAndMarkMeterIfNotPresent(opMeterName);
}
- private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
- Object op) {
+ private void opWithClientMetricRegisterAndMark(Object op) {
// // Mark client + access type metric
String opWithClientMeterName = opWithClientMeterName(op);
- registerAndMarkMeterIfNotPresent(e, opWithClientMeterName);
+ if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
+ return;
+ }
+ registerAndMarkMeterIfNotPresent(opWithClientMeterName);
}
// Helper function to register and mark meter if not present
- private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
- String name) {
- registerMeterIfNotPresent(e, name);
+ private void registerAndMarkMeterIfNotPresent(String name) {
+ registerMeterIfNotPresent(name);
markMeterIfPresent(name);
}
@@ -291,12 +304,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (clientIP.isEmpty()) {
return "";
}
- return String.format("MetaTable_client_%s_request", clientIP);
+ return String.format("MetaTable_client_%s_lossy_request", clientIP);
}
private String regionMeterName(String regionId) {
// Extract meter name containing the region ID
- return String.format("MetaTable_region_%s_request", regionId);
+ return String.format("MetaTable_region_%s_lossy_request", regionId);
}
}
@@ -312,9 +325,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
&& ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
.equals(TableName.META_TABLE_NAME)) {
- regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+ RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+ registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
requestsMap = new ConcurrentHashMap<>();
- clientMetricsLossyCounting = new LossyCounting();
+ clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
+ regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
// only be active mode when this region holds meta table.
active = true;
}
@@ -324,7 +339,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
public void stop(CoprocessorEnvironment env) throws IOException {
// since meta region can move around, clear stale metrics when stop.
if (requestsMap != null) {
- MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
for (String meterName : requestsMap.keySet()) {
registry.remove(meterName);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index cb737cb..be8b592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -51,9 +51,11 @@ public class LossyCounting {
private double errorRate;
private Map<String, Integer> data;
private long totalDataCount;
+ private String name;
- public LossyCounting(double errorRate) {
+ public LossyCounting(double errorRate, String name) {
this.errorRate = errorRate;
+ this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) {
throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
}
@@ -64,8 +66,9 @@ public class LossyCounting {
calculateCurrentTerm();
}
- public LossyCounting() {
- this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02));
+ public LossyCounting(String name) {
+ this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
+ name);
}
public Set<String> addByOne(String key) {
@@ -93,7 +96,7 @@ public class LossyCounting {
for(String key : dataToBeSwept) {
data.remove(key);
}
- LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size()));
+ LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
return dataToBeSwept;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index bbbeb9e..82ce709 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.JMXListener;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -73,6 +76,11 @@ public class TestMetaTableMetrics {
private static Configuration conf = null;
private static int connectorPort = 61120;
+ final byte[] cf = Bytes.toBytes("info");
+ final byte[] col = Bytes.toBytes("any");
+ byte[] tablename;
+ final int nthreads = 20;
+
@BeforeClass
public static void setupBeforeClass() throws Exception {
@@ -224,4 +232,95 @@ public class TestMetaTableMetrics {
assertEquals(5L, putWithClientMetricsCount);
}
+ @Test(timeout = 30000)
+ public void testConcurrentAccess() {
+ try {
+ tablename = Bytes.toBytes("hbase:meta");
+ int numRows = 3000;
+ int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
+ putData(numRows);
+ Thread.sleep(2000);
+ int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
+ assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
+ getData(numRows);
+ } catch (InterruptedException e) {
+ LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
+ fail();
+ } catch (IOException e) {
+ LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
+ fail();
+ }
+ }
+
+ public void putData(int nrows) throws InterruptedException {
+ LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
+ Thread[] threads = new Thread[nthreads];
+ for (int i = 1; i <= nthreads; i++) {
+ threads[i - 1] = new PutThread(1, nrows);
+ }
+ startThreadsAndWaitToJoin(threads);
+ }
+
+ public void getData(int nrows) throws InterruptedException {
+ LOG.info(String.format("Getting %d rows from hbase:meta", nrows));
+ Thread[] threads = new Thread[nthreads];
+ for (int i = 1; i <= nthreads; i++) {
+ threads[i - 1] = new GetThread(1, nrows);
+ }
+ startThreadsAndWaitToJoin(threads);
+ }
+
+ private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
+ for (int i = 1; i <= nthreads; i++) {
+ threads[i - 1].start();
+ }
+ for (int i = 1; i <= nthreads; i++) {
+ threads[i - 1].join();
+ }
+ }
+
+ class PutThread extends Thread {
+ int start;
+ int end;
+
+ public PutThread(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public void run() {
+ try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+ for (int i = start; i <= end; i++) {
+ Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+ p.addColumn(cf, col, Bytes.toBytes("Value" + i));
+ table.put(p);
+ }
+ } catch (IOException e) {
+ LOG.info("Caught IOException while PutThread operation: " + e.getMessage());
+ }
+ }
+ }
+
+ class GetThread extends Thread {
+ int start;
+ int end;
+
+ public GetThread(int start, int end) {
+ this.start = start;
+ this.end = end;
+ }
+
+ @Override
+ public void run() {
+ try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+ for (int i = start; i <= end; i++) {
+ Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+ table.get(get);
+ }
+ } catch (IOException e) {
+ LOG.info("Caught IOException while GetThread operation: " + e.getMessage());
+ }
+ }
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index 11758be..e4f1939 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -38,15 +38,15 @@ public class TestLossyCounting {
@Test
public void testBucketSize() {
- LossyCounting lossyCounting = new LossyCounting(0.01);
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
assertEquals(100L, lossyCounting.getBucketSize());
- LossyCounting lossyCounting2 = new LossyCounting();
+ LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
assertEquals(50L, lossyCounting2.getBucketSize());
}
@Test
public void testAddByOne() {
- LossyCounting lossyCounting = new LossyCounting(0.01);
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
for(int i = 0; i < 100; i++){
String key = "" + i;
lossyCounting.addByOne(key);
@@ -60,7 +60,7 @@ public class TestLossyCounting {
@Test
public void testSweep1() {
- LossyCounting lossyCounting = new LossyCounting(0.01);
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
for(int i = 0; i < 400; i++){
String key = "" + i;
lossyCounting.addByOne(key);
@@ -71,7 +71,7 @@ public class TestLossyCounting {
@Test
public void testSweep2() {
- LossyCounting lossyCounting = new LossyCounting(0.1);
+ LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
for(int i = 0; i < 10; i++){
String key = "" + i;
lossyCounting.addByOne(key);