You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/09/28 16:32:02 UTC

[hbase] branch branch-2.1 updated: HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm

This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.1 by this push:
     new 76b7db4  HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm
76b7db4 is described below

commit 76b7db4c35f563a3b0a4bd8cec469f13b1dd7c17
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Sat Sep 28 09:29:17 2019 -0700

    HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm
---
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java | 137 +++++++--------------
 .../apache/hadoop/hbase/util/LossyCounting.java    |  55 ++++++---
 .../hbase/coprocessor/TestMetaTableMetrics.java    |  24 ++--
 .../hadoop/hbase/util/TestLossyCounting.java       |  21 ++--
 4 files changed, 108 insertions(+), 129 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index d542d2f..70e8df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -1,23 +1,31 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.hadoop.hbase.coprocessor;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CoprocessorEnvironment;
 import org.apache.hadoop.hbase.TableName;
@@ -27,12 +35,11 @@ import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.metrics.Meter;
-import org.apache.hadoop.hbase.metrics.Metric;
 import org.apache.hadoop.hbase.metrics.MetricRegistry;
 import org.apache.hadoop.hbase.util.LossyCounting;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.yetus.audience.InterfaceAudience;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 
 
@@ -49,10 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
 public class MetaTableMetrics implements RegionCoprocessor {
 
   private ExampleRegionObserverMeta observer;
-  private Map<String, Optional<Metric>> requestsMap;
   private MetricRegistry registry;
   private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
   private boolean active = false;
+  private Set<String> metrics = new HashSet<String>();
 
   enum MetaTableOps {
     GET, PUT, DELETE;
@@ -101,66 +108,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
       opWithClientMetricRegisterAndMark(row);
     }
 
-    private void markMeterIfPresent(String requestMeter) {
-      if (requestMeter.isEmpty()) {
-        return;
-      }
-
-      Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
-      if (optionalMetric != null && optionalMetric.isPresent()) {
-        Meter metric = (Meter) optionalMetric.get();
-        metric.mark();
-      }
-    }
-
-    private void registerMeterIfNotPresent(String requestMeter) {
-      if (requestMeter.isEmpty()) {
-        return;
-      }
-      if (!requestsMap.containsKey(requestMeter)) {
-        registry.meter(requestMeter);
-        requestsMap.put(requestMeter, registry.get(requestMeter));
-      }
-    }
-
-    /**
-     * Registers and counts lossyCount for Meters that kept by lossy counting.
-     * By using lossy count to maintain meters, at most 7 / e meters will be kept  (e is error rate)
-     * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
-     *      also, all kept elements have frequency higher than e * N. (N is total count)
-     * @param requestMeter meter to be registered
-     * @param lossyCounting lossyCounting object for one type of meters.
-     */
-    private void registerLossyCountingMeterIfNotPresent(String requestMeter,
-        LossyCounting lossyCounting) {
-
-      if (requestMeter.isEmpty()) {
-        return;
-      }
-      synchronized (lossyCounting) {
-        Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
-
-        boolean isNewMeter = !requestsMap.containsKey(requestMeter);
-        boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
-        if (isNewMeter) {
-          if (requestMeterRemoved) {
-            // if the new metric is swept off by lossyCounting then don't add in the map
-            metersToBeRemoved.remove(requestMeter);
-          } else {
-            // else register the new metric and add in the map
-            registry.meter(requestMeter);
-            requestsMap.put(requestMeter, registry.get(requestMeter));
-          }
-        }
-
-        for (String meter : metersToBeRemoved) {
-          //cleanup requestsMap according to the swept data from lossy count;
-          requestsMap.remove(meter);
-          registry.remove(meter);
-        }
-      }
-    }
-
     /**
      * Get table name from Ops such as: get, put, delete.
      * @param op such as get, put or delete.
@@ -196,13 +143,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
 
     private void clientMetricRegisterAndMark() {
       // Mark client metric
-      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
+      String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
       if (clientIP == null || clientIP.isEmpty()) {
         return;
       }
       String clientRequestMeter = clientRequestMeterName(clientIP);
-      registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
-      markMeterIfPresent(clientRequestMeter);
+      clientMetricsLossyCounting.add(clientRequestMeter);
+      registerAndMarkMeter(clientRequestMeter);
     }
 
     private void tableMetricRegisterAndMark(Row op) {
@@ -212,7 +159,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
         return;
       }
       String tableRequestMeter = tableMeterName(tableName);
-      registerAndMarkMeterIfNotPresent(tableRequestMeter);
+      registerAndMarkMeter(tableRequestMeter);
     }
 
     private void regionMetricRegisterAndMark(Row op) {
@@ -222,8 +169,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
         return;
       }
       String regionRequestMeter = regionMeterName(regionId);
-      registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
-      markMeterIfPresent(regionRequestMeter);
+      regionMetricsLossyCounting.add(regionRequestMeter);
+      registerAndMarkMeter(regionRequestMeter);
     }
 
     private void opMetricRegisterAndMark(Row op) {
@@ -232,7 +179,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
       if (opMeterName == null || opMeterName.isEmpty()) {
         return;
       }
-      registerAndMarkMeterIfNotPresent(opMeterName);
+      registerAndMarkMeter(opMeterName);
     }
 
     private void opWithClientMetricRegisterAndMark(Object op) {
@@ -241,13 +188,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
       if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
         return;
       }
-      registerAndMarkMeterIfNotPresent(opWithClientMeterName);
+      registerAndMarkMeter(opWithClientMeterName);
     }
 
     // Helper function to register and mark meter if not present
-    private void registerAndMarkMeterIfNotPresent(String name) {
-      registerMeterIfNotPresent(name);
-      markMeterIfPresent(name);
+    private void registerAndMarkMeter(String requestMeter) {
+      if (requestMeter.isEmpty()) {
+        return;
+      }
+      if(!registry.get(requestMeter).isPresent()){
+        metrics.add(requestMeter);
+      }
+      registry.meter(requestMeter).mark();
     }
 
     private String opWithClientMeterName(Object op) {
@@ -327,9 +279,14 @@ public class MetaTableMetrics implements RegionCoprocessor {
           .equals(TableName.META_TABLE_NAME)) {
       RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
       registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
-      requestsMap = new ConcurrentHashMap<>();
-      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
-      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
+      LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
+        @Override public void sweep(String key) {
+          registry.remove(key);
+          metrics.remove(key);
+        }
+      };
+      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener);
+      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener);
       // only be active mode when this region holds meta table.
       active = true;
     }
@@ -338,10 +295,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
   @Override
   public void stop(CoprocessorEnvironment env) throws IOException {
     // since meta region can move around, clear stale metrics when stop.
-    if (requestsMap != null) {
-      for (String meterName : requestsMap.keySet()) {
-        registry.remove(meterName);
-      }
+    for(String metric:metrics){
+      registry.remove(metric);
     }
   }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index be8b592..ca1a014 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -19,7 +19,6 @@
 
 package org.apache.hadoop.hbase.util;
 
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -47,13 +46,18 @@ import org.slf4j.LoggerFactory;
 public class LossyCounting {
   private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
   private long bucketSize;
-  private long currentTerm;
+  private int currentTerm;
   private double errorRate;
   private Map<String, Integer> data;
   private long totalDataCount;
   private String name;
+  private LossyCountingListener listener;
 
-  public LossyCounting(double errorRate, String name) {
+  public interface LossyCountingListener {
+    void sweep(String key);
+  }
+
+  public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
     this.errorRate = errorRate;
     this.name = name;
     if (errorRate < 0.0 || errorRate > 1.0) {
@@ -63,48 +67,55 @@ public class LossyCounting {
     this.currentTerm = 1;
     this.totalDataCount = 0;
     this.data = new ConcurrentHashMap<>();
+    this.listener = listener;
     calculateCurrentTerm();
   }
 
-  public LossyCounting(String name) {
+  public LossyCounting(String name, LossyCountingListener listener) {
     this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
-        name);
+        name, listener);
   }
 
-  public Set<String> addByOne(String key) {
-    data.put(key, data.getOrDefault(key, 0) + 1);
+  private void addByOne(String key) {
+    //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
+    //we create a new entry starting with currentTerm so that it will not be pruned immediately
+    data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
+
+    //update totalDataCount and term
     totalDataCount++;
     calculateCurrentTerm();
-    Set<String> dataToBeSwept = new HashSet<>();
+  }
+
+  public void add(String key) {
+    addByOne(key);
     if(totalDataCount % bucketSize == 0) {
-      dataToBeSwept = sweep();
+      //sweep the entries at bucket boundaries
+      sweep();
     }
-    return dataToBeSwept;
   }
 
+
   /**
    * sweep low frequency data
    * @return Names of elements got swept
    */
-  private Set<String> sweep() {
-    Set<String> dataToBeSwept = new HashSet<>();
+  private void sweep() {
     for(Map.Entry<String, Integer> entry : data.entrySet()) {
-      if(entry.getValue() + errorRate < currentTerm) {
-        dataToBeSwept.add(entry.getKey());
+      if(entry.getValue() < currentTerm) {
+        String metric = entry.getKey();
+        data.remove(metric);
+        if (listener != null) {
+          listener.sweep(metric);
+        }
       }
     }
-    for(String key : dataToBeSwept) {
-      data.remove(key);
-    }
-    LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
-    return dataToBeSwept;
   }
 
   /**
    * Calculate and set current term
    */
   private void calculateCurrentTerm() {
-    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
+    this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
   }
 
   public long getBucketSize(){
@@ -119,6 +130,10 @@ public class LossyCounting {
     return data.containsKey(key);
   }
 
+  public Set<String> getElements(){
+    return data.keySet();
+  }
+
   public long getCurrentTerm() {
     return currentTerm;
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index 97f24ee..9fe8c12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -1,12 +1,20 @@
 /**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 
 package org.apache.hadoop.hbase.coprocessor;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index e4f1939..5240c40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -38,18 +38,18 @@ public class TestLossyCounting {
 
   @Test
   public void testBucketSize() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null);
     assertEquals(100L, lossyCounting.getBucketSize());
-    LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
+    LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null);
     assertEquals(50L, lossyCounting2.getBucketSize());
   }
 
   @Test
   public void testAddByOne() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null);
     for(int i = 0; i < 100; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(100L, lossyCounting.getDataSize());
     for(int i = 0; i < 100; i++){
@@ -60,26 +60,27 @@ public class TestLossyCounting {
 
   @Test
   public void testSweep1() {
-    LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
+    LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null);
     for(int i = 0; i < 400; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(4L, lossyCounting.getCurrentTerm());
-    assertEquals(0L, lossyCounting.getDataSize());
+    //if total rows added are proportional to bucket size
+    assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
   }
 
   @Test
   public void testSweep2() {
-    LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
+    LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
     for(int i = 0; i < 10; i++){
       String key = "" + i;
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(10L, lossyCounting.getDataSize());
     for(int i = 0; i < 10; i++){
       String key = "1";
-      lossyCounting.addByOne(key);
+      lossyCounting.add(key);
     }
     assertEquals(1L, lossyCounting.getDataSize());
   }