You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by xu...@apache.org on 2019/03/22 07:20:45 UTC

[hbase] branch branch-1 updated: HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements

This is an automated email from the ASF dual-hosted git repository.

xucang pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 1366f5c  HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements
1366f5c is described below

commit 1366f5cb6ac3a50ee3da8cda7e782197e551848a
Author: Sakthi <sa...@gmail.com>
AuthorDate: Tue Mar 19 17:16:23 2019 -0700

    HBASE-21991: Fix MetaMetrics issues - [Race condition, Faulty remove logic], few improvements
    
    Signed-off-by: Xu Cang <xu...@apache.org>
---
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java | 136 ++++++++++++---------
 .../apache/hadoop/hbase/util/LossyCounting.java    |  11 +-
 .../hbase/coprocessor/TestMetaTableMetrics.java    | 100 +++++++++++++++
 .../hadoop/hbase/util/TestLossyCounting.java       |  10 +-
 4 files changed, 189 insertions(+), 68 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index abef5aa..203857d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -35,12 +35,14 @@ import org.apache.hadoop.hbase.util.LossyCounting;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.ImmutableMap;
+
 /**
  * A coprocessor that collects metrics from meta table.
  * <p>
  * These metrics will be available through the regular Hadoop metrics2 sinks (ganglia, opentsdb,
  * etc) as well as JMX output.
  * </p>
+ *
  * @see MetaTableMetrics
  */
 
@@ -48,8 +50,8 @@ import com.google.common.collect.ImmutableMap;
 public class MetaTableMetrics extends BaseRegionObserver {
 
   private Map<String, Optional<Metric>> requestsMap;
-  private RegionCoprocessorEnvironment regionCoprocessorEnv;
-  private LossyCounting clientMetricsLossyCounting;
+  private MetricRegistry registry;
+  private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
   private boolean active = false;
 
   enum MetaTableOps {
@@ -57,11 +59,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
   }
 
   private ImmutableMap<Class, MetaTableOps> opsNameMap =
-      ImmutableMap.<Class, MetaTableOps>builder()
-              .put(Put.class, MetaTableOps.PUT)
-              .put(Get.class, MetaTableOps.GET)
-              .put(Delete.class, MetaTableOps.DELETE)
-              .build();
+      ImmutableMap.<Class, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
+          .put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();
 
   @Override
   public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results)
@@ -85,11 +84,11 @@ public class MetaTableMetrics extends BaseRegionObserver {
     if (!active || !isMetaTableOp(e)) {
       return;
     }
-    tableMetricRegisterAndMark(e, row);
-    clientMetricRegisterAndMark(e);
-    regionMetricRegisterAndMark(e, row);
-    opMetricRegisterAndMark(e, row);
-    opWithClientMetricRegisterAndMark(e, row);
+    tableMetricRegisterAndMark(row);
+    clientMetricRegisterAndMark();
+    regionMetricRegisterAndMark(row);
+    opMetricRegisterAndMark(row);
+    opWithClientMetricRegisterAndMark(row);
   }
 
   private void markMeterIfPresent(String requestMeter) {
@@ -97,19 +96,18 @@ public class MetaTableMetrics extends BaseRegionObserver {
       return;
     }
 
-    if (requestsMap.containsKey(requestMeter) && requestsMap.get(requestMeter).isPresent()) {
-      Meter metric = (Meter) requestsMap.get(requestMeter).get();
+    Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
+    if (optionalMetric != null && optionalMetric.isPresent()) {
+      Meter metric = (Meter) optionalMetric.get();
       metric.mark();
     }
   }
 
-  private void registerMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
-      String requestMeter) {
+  private void registerMeterIfNotPresent(String requestMeter) {
     if (requestMeter.isEmpty()) {
       return;
     }
     if (!requestsMap.containsKey(requestMeter)) {
-      MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
       registry.meter(requestMeter);
       requestsMap.put(requestMeter, registry.get(requestMeter));
     }
@@ -119,38 +117,43 @@ public class MetaTableMetrics extends BaseRegionObserver {
    * Registers and counts lossyCount for Meters that kept by lossy counting.
    * By using lossy count to maintain meters, at most 7 / e meters will be kept  (e is error rate)
    * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
-   *      also, all kept elements have frequency higher than e * N. (N is total count)
-   * @param e Region coprocessor environment
-   * @param requestMeter meter to be registered
+   * also, all kept elements have frequency higher than e * N. (N is total count)
+   *
+   * @param requestMeter  meter to be registered
    * @param lossyCounting lossyCounting object for one type of meters.
    */
-  private void registerLossyCountingMeterIfNotPresent(
-      ObserverContext<RegionCoprocessorEnvironment> e,
-      String requestMeter, LossyCounting lossyCounting) {
+  private void registerLossyCountingMeterIfNotPresent(String requestMeter,
+      LossyCounting lossyCounting) {
     if (requestMeter.isEmpty()) {
       return;
     }
-    Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
-    if(!requestsMap.containsKey(requestMeter) && metersToBeRemoved.contains(requestMeter)){
-      for(String meter: metersToBeRemoved) {
-        //cleanup requestsMap according swept data from lossy count;
+    synchronized (lossyCounting) {
+      Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
+
+      boolean isNewMeter = !requestsMap.containsKey(requestMeter);
+      boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
+      if (isNewMeter) {
+        if (requestMeterRemoved) {
+          // if the new metric is swept off by lossyCounting then don't add in the map
+          metersToBeRemoved.remove(requestMeter);
+        } else {
+          // else register the new metric and add in the map
+          registry.meter(requestMeter);
+          requestsMap.put(requestMeter, registry.get(requestMeter));
+        }
+      }
+
+      for (String meter : metersToBeRemoved) {
+        //cleanup requestsMap according to the swept data from lossy count;
         requestsMap.remove(meter);
-        MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
         registry.remove(meter);
       }
-      // newly added meter is swept by lossy counting cleanup. No need to put it into requestsMap.
-      return;
-    }
-
-    if (!requestsMap.containsKey(requestMeter)) {
-      MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
-      registry.meter(requestMeter);
-      requestsMap.put(requestMeter, registry.get(requestMeter));
     }
   }
 
   /**
    * Get table name from Ops such as: get, put, delete.
+   *
    * @param op such as get, put or delete.
    */
   private String getTableNameFromOp(Row op) {
@@ -165,7 +168,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
 
   /**
    * Get regionId from Ops such as: get, put, delete.
-   * @param op  such as get, put or delete.
+   *
+   * @param op such as get, put or delete.
    */
   private String getRegionIdFromOp(Row op) {
     String regionId = null;
@@ -181,47 +185,60 @@ public class MetaTableMetrics extends BaseRegionObserver {
     return TableName.META_TABLE_NAME.equals(e.getEnvironment().getRegionInfo().getTable());
   }
 
-  private void clientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e) {
+  private void clientMetricRegisterAndMark() {
     // Mark client metric
     String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
+    if (clientIP == null || clientIP.isEmpty()) {
+      return;
+    }
 
     String clientRequestMeter = clientRequestMeterName(clientIP);
-    registerLossyCountingMeterIfNotPresent(e, clientRequestMeter, clientMetricsLossyCounting);
+    registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
     markMeterIfPresent(clientRequestMeter);
   }
 
-  private void tableMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, Row op) {
+  private void tableMetricRegisterAndMark(Row op) {
     // Mark table metric
     String tableName = getTableNameFromOp(op);
+    if (tableName == null || tableName.isEmpty()) {
+      return;
+    }
     String tableRequestMeter = tableMeterName(tableName);
-    registerAndMarkMeterIfNotPresent(e, tableRequestMeter);
+    registerAndMarkMeterIfNotPresent(tableRequestMeter);
   }
 
-  private void regionMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-      Row op) {
+  private void regionMetricRegisterAndMark(Row op) {
     // Mark region metric
     String regionId = getRegionIdFromOp(op);
+    if (regionId == null || regionId.isEmpty()) {
+      return;
+    }
     String regionRequestMeter = regionMeterName(regionId);
-    registerAndMarkMeterIfNotPresent(e, regionRequestMeter);
+    registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
+    markMeterIfPresent(regionRequestMeter);
   }
 
-  private void opMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e, Row op) {
+  private void opMetricRegisterAndMark(Row op) {
     // Mark access type ["get", "put", "delete"] metric
     String opMeterName = opMeterName(op);
-    registerAndMarkMeterIfNotPresent(e, opMeterName);
+    if (opMeterName == null || opMeterName.isEmpty()) {
+      return;
+    }
+    registerAndMarkMeterIfNotPresent(opMeterName);
   }
 
-  private void opWithClientMetricRegisterAndMark(ObserverContext<RegionCoprocessorEnvironment> e,
-      Object op) {
-    // // Mark client + access type metric
+  private void opWithClientMetricRegisterAndMark(Object op) {
+    // Mark client + access type metric
     String opWithClientMeterName = opWithClientMeterName(op);
-    registerAndMarkMeterIfNotPresent(e, opWithClientMeterName);
+    if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
+      return;
+    }
+    registerAndMarkMeterIfNotPresent(opWithClientMeterName);
   }
 
   // Helper function to register and mark meter if not present
-  private void registerAndMarkMeterIfNotPresent(ObserverContext<RegionCoprocessorEnvironment> e,
-      String name) {
-    registerMeterIfNotPresent(e, name);
+  private void registerAndMarkMeterIfNotPresent(String name) {
+    registerMeterIfNotPresent(name);
     markMeterIfPresent(name);
   }
 
@@ -278,12 +295,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
     if (clientIP.isEmpty()) {
       return "";
     }
-    return String.format("MetaTable_client_%s_request", clientIP);
+    return String.format("MetaTable_client_%s_lossy_request", clientIP);
   }
 
   private String regionMeterName(String regionId) {
     // Extract meter name containing the region ID
-    return String.format("MetaTable_region_%s_request", regionId);
+    return String.format("MetaTable_region_%s_lossy_request", regionId);
   }
 
   @Override
@@ -291,10 +308,12 @@ public class MetaTableMetrics extends BaseRegionObserver {
     if (env instanceof RegionCoprocessorEnvironment
         && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable() != null
         && ((RegionCoprocessorEnvironment) env).getRegionInfo().getTable()
-          .equals(TableName.META_TABLE_NAME)) {
-      regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+        .equals(TableName.META_TABLE_NAME)) {
+      RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
+      registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
       requestsMap = new ConcurrentHashMap<>();
-      clientMetricsLossyCounting = new LossyCounting();
+      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
+      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
       // only be active mode when this region holds meta table.
       active = true;
     }
@@ -304,7 +323,6 @@ public class MetaTableMetrics extends BaseRegionObserver {
   public void stop(CoprocessorEnvironment env) throws IOException {
     // since meta region can move around, clear stale metrics when stop.
     if (requestsMap != null) {
-      MetricRegistry registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
       for (String meterName : requestsMap.keySet()) {
         registry.remove(meterName);
       }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index d526d88..712c22c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -50,9 +50,11 @@ public class LossyCounting {
   private double errorRate;
   private Map<String, Integer> data;
   private long totalDataCount;
+  private String name;
 
-  public LossyCounting(double errorRate) {
+  public LossyCounting(double errorRate, String name) {
     this.errorRate = errorRate;
+    this.name = name;
     if (errorRate < 0.0 || errorRate > 1.0) {
       throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
     }
@@ -63,8 +65,9 @@ public class LossyCounting {
     calculateCurrentTerm();
   }
 
-  public LossyCounting() {
-    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02));
+  public LossyCounting(String name) {
+    this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
+        name);
   }
 
   public Set<String> addByOne(String key) {
@@ -95,7 +98,7 @@ public class LossyCounting {
     for(String key : dataToBeSwept) {
       data.remove(key);
     }
-    LOG.debug(String.format("Swept %d elements.", dataToBeSwept.size()));
+    LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
     return dataToBeSwept;
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index 23dd517..97568fd 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -13,6 +13,8 @@ package org.apache.hadoop.hbase.coprocessor;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -35,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.JMXListener;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.testclassification.CoprocessorTests;
@@ -63,6 +66,11 @@ public class TestMetaTableMetrics {
   private static Configuration conf = null;
   private static int connectorPort = 61120;
 
+  final byte[] cf = Bytes.toBytes("info");
+  final byte[] col = Bytes.toBytes("any");
+  byte[] tablename;
+  final int nthreads = 20;
+
   @BeforeClass
   public static void setupBeforeClass() throws Exception {
 
@@ -220,4 +228,96 @@ public class TestMetaTableMetrics {
     assertEquals(5L, putWithClientMetricsCount);
   }
 
+  @Test(timeout = 30000)
+  public void testConcurrentAccess() {
+    try {
+      tablename = Bytes.toBytes("hbase:meta");
+      int numRows = 3000;
+      int numRowsInTableBefore = UTIL.countRows(TableName.valueOf(tablename));
+      putData(numRows);
+      Thread.sleep(2000);
+      int numRowsInTableAfter = UTIL.countRows(TableName.valueOf(tablename));
+      assertTrue(numRowsInTableAfter >= numRowsInTableBefore + numRows);
+      getData(numRows);
+    } catch (InterruptedException e) {
+      LOG.info("Caught InterruptedException while testConcurrentAccess: " + e.getMessage());
+      fail();
+    } catch (IOException e) {
+      LOG.info("Caught IOException while testConcurrentAccess: " + e.getMessage());
+      fail();
+    }
+  }
+
+  public void putData(int nrows) throws InterruptedException {
+    LOG.info(String.format("Putting %d rows in hbase:meta", nrows));
+    Thread[] threads = new Thread[nthreads];
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1] = new PutThread(1, nrows);
+    }
+    startThreadsAndWaitToJoin(threads);
+  }
+
+  public void getData(int nrows) throws InterruptedException {
+    LOG.info(String.format("Getting %d rows from hbase:meta", nrows));
+    Thread[] threads = new Thread[nthreads];
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1] = new GetThread(1, nrows);
+    }
+    startThreadsAndWaitToJoin(threads);
+  }
+
+  private void startThreadsAndWaitToJoin(Thread[] threads) throws InterruptedException {
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1].start();
+    }
+    for (int i = 1; i <= nthreads; i++) {
+      threads[i - 1].join();
+    }
+  }
+
+  class PutThread extends Thread {
+    int start;
+    int end;
+
+    public PutThread(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public void run() {
+      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+        for (int i = start; i <= end; i++) {
+          Put p = new Put(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+          p.addColumn(cf, col, Bytes.toBytes("Value" + i));
+          table.put(p);
+        }
+      } catch (IOException e) {
+        LOG.info("Caught IOException while PutThread operation: " + e.getMessage());
+      }
+    }
+  }
+
+  class GetThread extends Thread {
+    int start;
+    int end;
+
+    public GetThread(int start, int end) {
+      this.start = start;
+      this.end = end;
+    }
+
+    @Override
+    public void run() {
+      try (Table table = UTIL.getConnection().getTable(TableName.valueOf(tablename))) {
+        for (int i = start; i <= end; i++) {
+          Get get = new Get(Bytes.toBytes(String.format("tableName,rowKey%d,region%d", i, i)));
+          table.get(get);
+        }
+      } catch (IOException e) {
+        LOG.info("Caught IOException while GetThread operation: " + e.getMessage());
+      }
+    }
+  }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index edef0bb..0d41717 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -33,15 +33,15 @@ public class TestLossyCounting {
 
   @Test
   public void testBucketSize() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
     assertEquals(100L, lossyCounting.getBucketSize());
-    LossyCounting lossyCounting2 = new LossyCounting();
+    LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
     assertEquals(50L, lossyCounting2.getBucketSize());
   }
 
   @Test
   public void testAddByOne() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
     for(int i = 0; i < 100; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);
@@ -55,7 +55,7 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep1() {
-    LossyCounting lossyCounting = new LossyCounting(0.01);
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
     for(int i = 0; i < 400; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);
@@ -66,7 +66,7 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep2() {
-    LossyCounting lossyCounting = new LossyCounting(0.1);
+    LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
     for(int i = 0; i < 10; i++){
       String key = "" + i;
       lossyCounting.addByOne(key);