You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ps...@apache.org on 2019/05/25 22:12:29 UTC

[hbase] 02/02: HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements

This is an automated email from the ASF dual-hosted git repository.

psomogyi pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit b9285b3a3fe51d3fea1c222ede4cd1e189d12feb
Author: Sakthi <sa...@gmail.com>
AuthorDate: Sat Mar 9 16:09:34 2019 -0800

    HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements
    
    Signed-off-by: Xu Cang <xu...@apache.org>
---
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java | 118 ++++++++++++---------
 .../apache/hadoop/hbase/util/LossyCounting.java    |  11 +-
 .../hbase/coprocessor/TestMetaTableMetrics.java    |  99 +++++++++++++++++
 .../hadoop/hbase/util/TestLossyCounting.java       |  10 +-
 4 files changed, 177 insertions(+), 61 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index d08bae6..d542d2f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -50,8 +50,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
 
   private ExampleRegionObserverMeta observer;
   private Map<String, Optional<Metric>> requestsMap;
-  private RegionCoprocessorEnvironment regionCoprocessorEnv;
-  private LossyCounting clientMetricsLossyCounting;
+  private MetricRegistry registry;
+  private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
   private boolean active = false;
 
   enum MetaTableOps {
@@ -94,11 +94,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
       if (!active || !isMetaTableOp(e)) {
         return;
       }
-      tableMetricRegisterAndMark(e, row);
-      clientMetricRegisterAndMark(e);
-      regionMetricRegisterAndMark(e, row);
-      opMetricRegisterAndMark(e, row);
-      opWithClientMetricRegisterAndMark(e, row);
+      tableMetricRegisterAndMark(row);
+      clientMetricRegisterAndMark();
+      regionMetricRegisterAndMark(row);
+      opMetricRegisterAndMark(row);
+      opWithClientMetricRegisterAndMark(row);
     }
 
     private void markMeterIfPresent(String requestMeter) {
@@ -106,19 +106,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
         return;
       }
 
-      if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) {
-        Meter metric = (Meter) requestsMap.get(requestMeter).get();
+      Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
+      if (optionalMetric != null && optionalMetric.isPresent()) {
+        Meter metric = (Meter) optionalMetric.get();
         metric.mark();
       }
     }
 
-    private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
-        String requestMeter) {
+    private void registerMeterIfNotPresent(String requestMeter) {
       if (requestMeter.isEmpty()) {
         return;
       }
       if (!requestsMap.containsKey(requestMeter)) {
-        MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
         registry.meter(requestMeter);
         requestsMap.put(requestMeter, registry.get(requestMeter));
       }
@@ -129,32 +128,36 @@ public class MetaTableMetrics implements RegionCoprocessor {
      * By using lossy count to maintain meters, at most 7 / e meters will be kept  (e is error rate)
      * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
      *      also, all kept elements have frequency higher than e * N. (N is total count)
-     * @param e Region coprocessor environment
      * @param requestMeter meter to be registered
      * @param lossyCounting lossyCounting object for one type of meters.
      */
-    private void registerLossyCountingMeterIfNotPresent(
-        ObserverContext<RegionCoprocessorEnvironment> e,
-        String requestMeter, LossyCounting lossyCounting) {
+    private void registerLossyCountingMeterIfNotPresent(String requestMeter,
+        LossyCounting lossyCounting) {
+
       if (requestMeter.isEmpty()) {
         return;
       }
-      Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
-      if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){
-        for(String meter: metersToBeRemoved) {
-          //cleanup requestsMap according swept data from lossy count;
+      synchronized (lossyCounting) {
+        Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
+
+        boolean isNewMeter = !requestsMap.containsKey(requestMeter);
+        boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
+        if (isNewMeter) {
+          if (requestMeterRemoved) {
+            // if the new metric is swept off by lossyCounting then don't add in the map
+            metersToBeRemoved.remove(requestMeter);
+          } else {
+            // else register the new metric and add in the map
+            registry.meter(requestMeter);
+            requestsMap.put(requestMeter, registry.get(requestMeter));
+          }
+        }
+
+        for (String meter : metersToBeRemoved) {
+          //cleanup requestsMap according to the swept data from lossy count;
           requestsMap.remove(meter);
-          MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
           registry.remove(meter);
         }
-        // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
-        return;
-      }
-
-      if (!requestsMap.containsKey(requestMeter)) {
-        MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
-        registry.meter(requestMeter);
-        requestsMap.put(requestMeter, registry.get(requestMeter));
       }
     }
 
@@ -191,49 +194,59 @@ public class MetaTableMetrics implements RegionCoprocessor {
           .equals(e.getEnvironment().getRegionInfo().getTable());
     }
 
-    private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
+    private void clientMetricRegisterAndMark() {
       // Mark client metric
       String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
-
+      if (clientIP == null || clientIP.isEmpty()) {
+        return;
+      }
       String clientRequestMeter = clientRequestMeterName(clientIP);
-      registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting);
+      registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
       markMeterIfPresent(clientRequestMeter);
     }
 
-    private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-        Row op) {
+    private void tableMetricRegisterAndMark(Row op) {
       // Mark table metric
       String tableName = getTableNameFromOp(op);
+      if (tableName == null || tableName.isEmpty()) {
+        return;
+      }
       String tableRequestMeter = tableMeterName(tableName);
-      registerAndMarkMeterIfNotPresent(e, tableRequestMeter);
+      registerAndMarkMeterIfNotPresent(tableRequestMeter);
     }
 
-    private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-        Row op) {
+    private void regionMetricRegisterAndMark(Row op) {
       // Mark region metric
       String regionId = getRegionIdFromOp(op);
+      if (regionId == null || regionId.isEmpty()) {
+        return;
+      }
       String regionRequestMeter = regionMeterName(regionId);
-      registerAndMarkMeterIfNotPresent(e, regionRequestMeter);
+      registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
+      markMeterIfPresent(regionRequestMeter);
     }
 
-    private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-        Row op) {
+    private void opMetricRegisterAndMark(Row op) {
       // Mark access type ["get", "put", "delete"] metric
       String opMeterName = opMeterName(op);
-      registerAndMarkMeterIfNotPresent(e, opMeterName);
+      if (opMeterName == null || opMeterName.isEmpty()) {
+        return;
+      }
+      registerAndMarkMeterIfNotPresent(opMeterName);
     }
 
-    private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-        Object op) {
+    private void opWithClientMetricRegisterAndMark(Object op) {
       // // Mark client + access type metric
       String opWithClientMeterName = opWithClientMeterName(op);
-      registerAndMarkMeterIfNotPresent(e, opWithClientMeterName);
+      if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
+        return;
+      }
+      registerAndMarkMeterIfNotPresent(opWithClientMeterName);
     }
 
     // Helper function to register and mark meter if not present
-    private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
-        String name) {
-      registerMeterIfNotPresent(e, name);
+    private void registerAndMarkMeterIfNotPresent(String name) {
+      registerMeterIfNotPresent(name);
       markMeterIfPresent(name);
     }
 
@@ -291,12 +304,12 @@ public class MetaTableMetrics implements RegionCoprocessor {
       if (clientIP.isEmpty()) {
         return "";
       }
-      return String.format("MetaTable_client_%s_request", clientIP);
+      return String.format("MetaTable_client_%s_lossy_request", clientIP);
     }
 
     private String regionMeterName(String regionId) {
       // Extract meter name containing the region ID
-      return String.format("MetaTable_region_%s_request", regionId);
+      return String.format("MetaTable_region_%s_lossy_request", regionId);
     }
   }
 
@@ -312,9 +325,11 @@ public class MetaTableMetrics implements RegionCoprocessor {
         && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
         && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
           .equals(TableName.META_TABLE_NAME)) {
-      regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+      RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+      registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
       requestsMap = new ConcurrentHashMap<>();
-      clientMetricsLossyCounting = new LossyCounting();
+      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
+      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
       // only be active mode when this region holds meta table.
       active = true;
     }
@@ -324,7 +339,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
   public void stop(CoprocessorEnvironment env) throws IOException {
     // since meta region can move around, clear stale metrics when stop.
     if (requestsMap != null) {
-      MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
       for (String meterName : requestsMap.keySet()) {
         registry.remove(meterName);
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index cb737cb..be8b592 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -51,9 +51,11 @@ public class LossyCounting {
   private double errorRate;
   private Map<String, Integer> data;
   private long totalDataCount;
+  private String name;
 
-  public LossyCounting(double errorRate) {
+  public LossyCounting(double errorRate, String name) {
     this.errorRate = errorRate;
+    this.name = name;
     if (errorRate < 0.0 || errorRate > 1.0) {
       throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
     }
@@ -64,8 +66,9 @@ public class LossyCounting {
     calculateCurrentTerm();
   }
 
-  public LossyCounting() {
-    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02));
+  public LossyCounting(String name) {
+    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
+        name);
   }
 
   public Set<String> addByOne(String key) {
@@ -93,7 +96,7 @@ public class LossyCounting {
     for(String key : dataToBeSwept) {
       data.remove(key);
     }
-    LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size()));
+    LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
     return dataToBeSwept;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index bbbeb9e..82ce709 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.JMXListener;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
@@ -73,6 +76,11 @@ public class TestMetaTableMetrics {
   private static Configuration conf = null;
   private static int connectorPort = 61120;
 
+  final byte[] cf = Bytes.toBytes("info");
+  final byte[] col = Bytes.toBytes("any");
+  byte[] tablename;
+  final int nthreads = 20;
+
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
 
@@ -224,4 +232,95 @@ public class TestMetaTableMetrics {
     assertEquals(5L, putWithClientMetricsCount);
   }
 
+  @Test(timeout = 30000)
+  public void testConcurrentAccess() {
+    try {
+      tablename = Bytes.toBytes("hbase:meta");
+      int numRows = 3000;
+      int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
+      putData(numRows);
+      Thread.sleep(2000);
+      int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
+      assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
+      getData(numRows);
+    } catch (InterruptedException e) {
+      LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
+      fail();
+    } catch (IOException e) {
+      LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
+      fail();
+    }
+  }
+
+  public void putData(int nrows) throws InterruptedException {
+    LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
+    Thread[] threads = new Thread[nthreads];
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1] = new PutThread(1, nrows);
+    }
+    startThreadsAndWaitToJoin(threads);
+  }
+
+  public void getData(int nrows) throws InterruptedException {
+    LOG.info(String.format("Getting %d rows from hbase:meta", nrows));
+    Thread[] threads = new Thread[nthreads];
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1] = new GetThread(1, nrows);
+    }
+    startThreadsAndWaitToJoin(threads);
+  }
+
+  private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1].start();
+    }
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1].join();
+    }
+  }
+
+  class PutThread extends Thread {
+    int start;
+    int end;
+
+    public PutThread(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public void run() {
+      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+        for (int i = start; i <= end; i++) {
+          Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+          p.addColumn(cf, col, Bytes.toBytes("Value" + i));
+          table.put(p);
+        }
+      } catch (IOException e) {
+        LOG.info("Caught IOException while PutThread operation: " + e.getMessage());
+      }
+    }
+  }
+
+  class GetThread extends Thread {
+    int start;
+    int end;
+
+    public GetThread(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public void run() {
+      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+        for (int i = start; i <= end; i++) {
+          Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+          table.get(get);
+        }
+      } catch (IOException e) {
+        LOG.info("Caught IOException while GetThread operation: " + e.getMessage());
+      }
+    }
+  }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index 11758be..e4f1939 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -38,15 +38,15 @@ public class TestLossyCounting {
 
   @Test
   public void testBucketSize() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
     assertEquals(100L, lossyCounting.getBucketSize());
-    LossyCounting lossyCounting2 = new LossyCounting();
+    LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
     assertEquals(50L, lossyCounting2.getBucketSize());
   }
 
   @Test
   public void testAddByOne() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
     for(int i = 0; i < 100; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);
@@ -60,7 +60,7 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep1() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
     for(int i = 0; i < 400; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);
@@ -71,7 +71,7 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep2() {
-    LossyCounting lossyCounting = new LossyCounting(0.1);
+    LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
     for(int i = 0; i < 10; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);