You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2019/10/07 18:10:58 UTC

[hbase] branch branch-1 updated: HBASE-23110 Backport HBASE-23054 "Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm" to branch-1 (#683)

This is an automated email from the ASF dual-hosted git repository.

apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1 by this push:
     new 6421112  HBASE-23110 Backport HBASE-23054 "Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm" to branch-1 (#683)
6421112 is described below

commit 6421112318db3927239a55ae19074dede08271f0
Author: Andrew Purtell <ap...@apache.org>
AuthorDate: Mon Oct 7 11:10:52 2019 -0700

    HBASE-23110 Backport HBASE-23054 "Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm" to branch-1 (#683)
    
    Additional fixes for reported findbugs and checkstyle warnings.
---
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java | 123 ++++++---------------
 .../apache/hadoop/hbase/util/LossyCounting.java    |  66 ++++++-----
 .../hbase/coprocessor/TestMetaTableMetrics.java    |  24 ++--
 .../hadoop/hbase/util/TestLossyCounting.java       |  22 ++--
 4 files changed, 96 insertions(+), 139 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index 203857d..a5a880b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -13,10 +13,12 @@ package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+
+import com.google.common.collect.ImmutableMap;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.TableName;
@@ -27,15 +29,10 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.metrics.Meter;
-import org.apache.hadoop.hbase.metrics.Metric;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.LossyCounting;
 
-import com.google.common.base.Optional;
-import com.google.common.collect.ImmutableMap;
-
 /**
  * A coprocessor that collects metrics from meta table.
  * <p>
@@ -45,21 +42,20 @@ import com.google.common.collect.ImmutableMap;
  *
  * @see MetaTableMetrics
  */
-
 @InterfaceAudience.Private
 public class MetaTableMetrics extends BaseRegionObserver {
 
-  private Map<String, Optional<Metric>> requestsMap;
   private MetricRegistry registry;
   private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
   private boolean active = false;
+  private Set<String> metrics = new HashSet<String>();
 
   enum MetaTableOps {
     GET, PUT, DELETE;
   }
 
-  private ImmutableMap<Class, MetaTableOps> opsNameMap =
-      ImmutableMap.<Class, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
+  private ImmutableMap<Class<?>, MetaTableOps> opsNameMap =
+      ImmutableMap.<Class<?>, MetaTableOps>builder().put(Put.class, MetaTableOps.PUT)
           .put(Get.class, MetaTableOps.GET).put(Delete.class, MetaTableOps.DELETE).build();
 
   @Override
@@ -91,66 +87,6 @@ public class MetaTableMetrics extends BaseRegionObserver {
     opWithClientMetricRegisterAndMark(row);
   }
 
-  private void markMeterIfPresent(String requestMeter) {
-    if (requestMeter.isEmpty()) {
-      return;
-    }
-
-    Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
-    if (optionalMetric != null && optionalMetric.isPresent()) {
-      Meter metric = (Meter) optionalMetric.get();
-      metric.mark();
-    }
-  }
-
-  private void registerMeterIfNotPresent(String requestMeter) {
-    if (requestMeter.isEmpty()) {
-      return;
-    }
-    if (!requestsMap.containsKey(requestMeter)) {
-      registry.meter(requestMeter);
-      requestsMap.put(requestMeter, registry.get(requestMeter));
-    }
-  }
-
-  /**
-   * Registers and counts lossyCount for Meters that kept by lossy counting.
-   * By using lossy count to maintain meters, at most 7 / e meters will be kept  (e is error rate)
-   * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
-   * also, all kept elements have frequency higher than e * N. (N is total count)
-   *
-   * @param requestMeter  meter to be registered
-   * @param lossyCounting lossyCounting object for one type of meters.
-   */
-  private void registerLossyCountingMeterIfNotPresent(String requestMeter,
-      LossyCounting lossyCounting) {
-    if (requestMeter.isEmpty()) {
-      return;
-    }
-    synchronized (lossyCounting) {
-      Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
-
-      boolean isNewMeter = !requestsMap.containsKey(requestMeter);
-      boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
-      if (isNewMeter) {
-        if (requestMeterRemoved) {
-          // if the new metric is swept off by lossyCounting then don't add in the map
-          metersToBeRemoved.remove(requestMeter);
-        } else {
-          // else register the new metric and add in the map
-          registry.meter(requestMeter);
-          requestsMap.put(requestMeter, registry.get(requestMeter));
-        }
-      }
-
-      for (String meter : metersToBeRemoved) {
-        //cleanup requestsMap according to the swept data from lossy count;
-        requestsMap.remove(meter);
-        registry.remove(meter);
-      }
-    }
-  }
-
   /**
    * Get table name from Ops such as: get, put, delete.
    *
@@ -187,14 +123,14 @@ public class MetaTableMetrics extends BaseRegionObserver {
 
   private void clientMetricRegisterAndMark() {
     // Mark client metric
-    String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
+    String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
     if (clientIP == null || clientIP.isEmpty()) {
       return;
     }
 
     String clientRequestMeter = clientRequestMeterName(clientIP);
-    registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
-    markMeterIfPresent(clientRequestMeter);
+    clientMetricsLossyCounting.add(clientRequestMeter);
+    registerAndMarkMeter(clientRequestMeter);
   }
 
   private void tableMetricRegisterAndMark(Row op) {
@@ -204,7 +140,7 @@ public class MetaTableMetrics extends BaseRegionObserver {
       return;
     }
     String tableRequestMeter = tableMeterName(tableName);
-    registerAndMarkMeterIfNotPresent(tableRequestMeter);
+    registerAndMarkMeter(tableRequestMeter);
   }
 
   private void regionMetricRegisterAndMark(Row op) {
@@ -214,8 +150,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
       return;
     }
     String regionRequestMeter = regionMeterName(regionId);
-    registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
-    markMeterIfPresent(regionRequestMeter);
+    regionMetricsLossyCounting.add(regionRequestMeter);
+    registerAndMarkMeter(regionRequestMeter);
   }
 
   private void opMetricRegisterAndMark(Row op) {
@@ -224,7 +160,7 @@ public class MetaTableMetrics extends BaseRegionObserver {
     if (opMeterName == null || opMeterName.isEmpty()) {
       return;
     }
-    registerAndMarkMeterIfNotPresent(opMeterName);
+    registerAndMarkMeter(opMeterName);
   }
 
   private void opWithClientMetricRegisterAndMark(Object op) {
@@ -233,13 +169,17 @@ public class MetaTableMetrics extends BaseRegionObserver {
     if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
       return;
     }
-    registerAndMarkMeterIfNotPresent(opWithClientMeterName);
+    registerAndMarkMeter(opWithClientMeterName);
   }
 
-  // Helper function to register and mark meter if not present
-  private void registerAndMarkMeterIfNotPresent(String name) {
-    registerMeterIfNotPresent(name);
-    markMeterIfPresent(name);
+  private void registerAndMarkMeter(String requestMeter) {
+    if (requestMeter.isEmpty()) {
+      return;
+    }
+    if (!registry.get(requestMeter).isPresent()){
+      metrics.add(requestMeter);
+    }
+    registry.meter(requestMeter).mark();
   }
 
   private String opWithClientMeterName(Object op) {
@@ -311,9 +251,14 @@ public class MetaTableMetrics extends BaseRegionObserver {
         .equals(TableName.META_TABLE_NAME)) {
       RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
       registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
-      requestsMap = new ConcurrentHashMap<>();
-      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
-      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
+      LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
+        @Override public void sweep(String key) {
+          registry.remove(key);
+          metrics.remove(key);
+        }
+      };
+      clientMetricsLossyCounting = new LossyCounting(listener);
+      regionMetricsLossyCounting = new LossyCounting(listener);
       // only be active mode when this region holds meta table.
       active = true;
     }
@@ -322,10 +267,8 @@ public class MetaTableMetrics extends BaseRegionObserver {
   @Override
   public void stop(CoprocessorEnvironment env) throws IOException {
     // since meta region can move around, clear stale metrics when stop.
-    if (requestsMap != null) {
-      for (String meterName : requestsMap.keySet()) {
-        registry.remove(meterName);
-      }
+    for (String metric:metrics){
+      registry.remove(metric);
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index d3a66be..255f720 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.hbase.util;
 
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,9 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 
 /**
  * LossyCounting utility, bounded data structure that maintains approximate high frequency
@@ -44,17 +40,17 @@ import org.slf4j.LoggerFactory;
 
 @InterfaceAudience.Private
 public class LossyCounting {
-  private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
   private long bucketSize;
-  private long currentTerm;
-  private double errorRate;
+  private int currentTerm;
   private Map<String, Integer> data;
   private long totalDataCount;
-  private String name;
+  private LossyCountingListener listener;
+
+  public interface LossyCountingListener {
+    void sweep(String key);
+  }
 
-  public LossyCounting(double errorRate, String name) {
-    this.errorRate = errorRate;
-    this.name = name;
+  public LossyCounting(double errorRate, LossyCountingListener listener) {
     if (errorRate < 0.0 || errorRate > 1.0) {
       throw new IllegalArgumentException(" Lossy Counting error rate should be within range [0,1]");
     }
@@ -62,51 +58,57 @@ public class LossyCounting {
     this.currentTerm = 1;
     this.totalDataCount = 0;
     this.data = new ConcurrentHashMap<>();
+    this.listener = listener;
     calculateCurrentTerm();
   }
 
-  public LossyCounting(String name) {
+  public LossyCounting(LossyCountingListener listener) {
     this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
-        name);
+        listener);
   }
 
-  public Set<String> addByOne(String key) {
-    if (!data.containsKey(key)) {
-      data.put(key, 0);
+  private void addByOne(String key) {
+    //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
+    //we create a new entry starting with currentTerm so that it will not be pruned immediately
+    Integer i = data.get(key);
+    if (i == null) {
+      i = currentTerm != 0 ? currentTerm - 1 : 0;
     }
-    data.put(key, data.get(key) + 1);
+    data.put(key, i + 1);
+    //update totalDataCount and term
     totalDataCount++;
     calculateCurrentTerm();
-    Set<String> dataToBeSwept = new HashSet<>();
+  }
+
+  public void add(String key) {
+    addByOne(key);
     if(totalDataCount % bucketSize == 0) {
-      dataToBeSwept = sweep();
+      //sweep the entries at bucket boundaries
+      sweep();
     }
-    return dataToBeSwept;
   }
 
   /**
    * sweep low frequency data
    * @return Names of elements got swept
    */
-  private Set<String> sweep() {
-    Set<String> dataToBeSwept = new HashSet<>();
+  private void sweep() {
     for(Map.Entry<String, Integer> entry : data.entrySet()) {
-      if(entry.getValue() + errorRate < currentTerm) {
-        dataToBeSwept.add(entry.getKey());
+      if(entry.getValue() < currentTerm) {
+        String metric = entry.getKey();
+        data.remove(metric);
+        if (listener != null) {
+          listener.sweep(metric);
+        }
       }
     }
-    for(String key : dataToBeSwept) {
-      data.remove(key);
-    }
-    LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
-    return dataToBeSwept;
   }
 
   /**
    * Calculate and set current term
    */
   private void calculateCurrentTerm() {
-    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
+    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
   }
 
   public long getBucketSize(){
@@ -121,6 +123,10 @@ public class LossyCounting {
     return data.containsKey(key);
   }
 
+  public Set<String> getElements(){
+    return data.keySet();
+  }
+
   public long getCurrentTerm() {
     return currentTerm;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index 0f35d60..15987bc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -1,12 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.hadoop.hbase.coprocessor;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index 0d41717..548d31a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 
-import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -33,18 +32,18 @@ public class TestLossyCounting {
 
   @Test
   public void testBucketSize() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
+    LossyCounting lossyCounting = new LossyCounting(0.01, null);
     assertEquals(100L, lossyCounting.getBucketSize());
-    LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
+    LossyCounting lossyCounting2 = new LossyCounting(null);
     assertEquals(50L, lossyCounting2.getBucketSize());
   }
 
   @Test
   public void testAddByOne() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
+    LossyCounting lossyCounting = new LossyCounting(0.01, null);
     for(int i = 0; i < 100; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(100L, lossyCounting.getDataSize());
     for(int i = 0; i < 100; i++){
@@ -55,26 +54,27 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep1() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
+    LossyCounting lossyCounting = new LossyCounting(0.01, null);
     for(int i = 0; i < 400; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(4L, lossyCounting.getCurrentTerm());
-    assertEquals(0L, lossyCounting.getDataSize());
+    //if total rows added are proportional to bucket size
+    assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
   }
 
   @Test
   public void testSweep2() {
-    LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
+    LossyCounting lossyCounting = new LossyCounting(0.1, null);
     for(int i = 0; i < 10; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(10L, lossyCounting.getDataSize());
     for(int i = 0; i < 10; i++){
       String key = "1";
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(1L, lossyCounting.getDataSize());
   }