You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/09/28 16:31:11 UTC
[hbase] branch branch-2.2 updated: HBASE-23054 Remove
synchronization block from MetaTableMetrics and fix LossyCounting algorithm
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new b7aeed7 HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm
b7aeed7 is described below
commit b7aeed707e3699d77b70c4c02d889e100fb3a971
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Sat Sep 28 09:29:17 2019 -0700
HBASE-23054 Remove synchronization block from MetaTableMetrics and fix LossyCounting algorithm
---
.../hadoop/hbase/coprocessor/MetaTableMetrics.java | 137 +++++++--------------
.../apache/hadoop/hbase/util/LossyCounting.java | 55 ++++++---
.../hbase/coprocessor/TestMetaTableMetrics.java | 24 ++--
.../hadoop/hbase/util/TestLossyCounting.java | 21 ++--
4 files changed, 108 insertions(+), 129 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index d542d2f..70e8df1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -1,23 +1,31 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
+import java.util.HashSet;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
+
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.TableName;
@@ -27,12 +35,11 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.ipc.RpcServer;
-import org.apache.hadoop.hbase.metrics.Meter;
-import org.apache.hadoop.hbase.metrics.Metric;
import org.apache.hadoop.hbase.metrics.MetricRegistry;
import org.apache.hadoop.hbase.util.LossyCounting;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.yetus.audience.InterfaceAudience;
+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
@@ -49,10 +56,10 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
public class MetaTableMetrics implements RegionCoprocessor {
private ExampleRegionObserverMeta observer;
- private Map<String, Optional<Metric>> requestsMap;
private MetricRegistry registry;
private LossyCounting clientMetricsLossyCounting, regionMetricsLossyCounting;
private boolean active = false;
+ private Set<String> metrics = new HashSet<String>();
enum MetaTableOps {
GET, PUT, DELETE;
@@ -101,66 +108,6 @@ public class MetaTableMetrics implements RegionCoprocessor {
opWithClientMetricRegisterAndMark(row);
}
- private void markMeterIfPresent(String requestMeter) {
- if (requestMeter.isEmpty()) {
- return;
- }
-
- Optional<Metric> optionalMetric = requestsMap.get(requestMeter);
- if (optionalMetric != null && optionalMetric.isPresent()) {
- Meter metric = (Meter) optionalMetric.get();
- metric.mark();
- }
- }
-
- private void registerMeterIfNotPresent(String requestMeter) {
- if (requestMeter.isEmpty()) {
- return;
- }
- if (!requestsMap.containsKey(requestMeter)) {
- registry.meter(requestMeter);
- requestsMap.put(requestMeter, registry.get(requestMeter));
- }
- }
-
- /**
- * Registers and counts lossyCount for Meters that kept by lossy counting.
- * By using lossy count to maintain meters, at most 7 / e meters will be kept (e is error rate)
- * e.g. when e is 0.02 by default, at most 350 Clients request metrics will be kept
- * also, all kept elements have frequency higher than e * N. (N is total count)
- * @param requestMeter meter to be registered
- * @param lossyCounting lossyCounting object for one type of meters.
- */
- private void registerLossyCountingMeterIfNotPresent(String requestMeter,
- LossyCounting lossyCounting) {
-
- if (requestMeter.isEmpty()) {
- return;
- }
- synchronized (lossyCounting) {
- Set<String> metersToBeRemoved = lossyCounting.addByOne(requestMeter);
-
- boolean isNewMeter = !requestsMap.containsKey(requestMeter);
- boolean requestMeterRemoved = metersToBeRemoved.contains(requestMeter);
- if (isNewMeter) {
- if (requestMeterRemoved) {
- // if the new metric is swept off by lossyCounting then don't add in the map
- metersToBeRemoved.remove(requestMeter);
- } else {
- // else register the new metric and add in the map
- registry.meter(requestMeter);
- requestsMap.put(requestMeter, registry.get(requestMeter));
- }
- }
-
- for (String meter : metersToBeRemoved) {
- //cleanup requestsMap according to the swept data from lossy count;
- requestsMap.remove(meter);
- registry.remove(meter);
- }
- }
- }
-
/**
* Get table name from Ops such as: get, put, delete.
* @param op such as get, put or delete.
@@ -196,13 +143,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
private void clientMetricRegisterAndMark() {
// Mark client metric
- String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : "";
+ String clientIP = RpcServer.getRemoteIp() != null ? RpcServer.getRemoteIp().toString() : null;
if (clientIP == null || clientIP.isEmpty()) {
return;
}
String clientRequestMeter = clientRequestMeterName(clientIP);
- registerLossyCountingMeterIfNotPresent(clientRequestMeter, clientMetricsLossyCounting);
- markMeterIfPresent(clientRequestMeter);
+ clientMetricsLossyCounting.add(clientRequestMeter);
+ registerAndMarkMeter(clientRequestMeter);
}
private void tableMetricRegisterAndMark(Row op) {
@@ -212,7 +159,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
return;
}
String tableRequestMeter = tableMeterName(tableName);
- registerAndMarkMeterIfNotPresent(tableRequestMeter);
+ registerAndMarkMeter(tableRequestMeter);
}
private void regionMetricRegisterAndMark(Row op) {
@@ -222,8 +169,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
return;
}
String regionRequestMeter = regionMeterName(regionId);
- registerLossyCountingMeterIfNotPresent(regionRequestMeter, regionMetricsLossyCounting);
- markMeterIfPresent(regionRequestMeter);
+ regionMetricsLossyCounting.add(regionRequestMeter);
+ registerAndMarkMeter(regionRequestMeter);
}
private void opMetricRegisterAndMark(Row op) {
@@ -232,7 +179,7 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (opMeterName == null || opMeterName.isEmpty()) {
return;
}
- registerAndMarkMeterIfNotPresent(opMeterName);
+ registerAndMarkMeter(opMeterName);
}
private void opWithClientMetricRegisterAndMark(Object op) {
@@ -241,13 +188,18 @@ public class MetaTableMetrics implements RegionCoprocessor {
if (opWithClientMeterName == null || opWithClientMeterName.isEmpty()) {
return;
}
- registerAndMarkMeterIfNotPresent(opWithClientMeterName);
+ registerAndMarkMeter(opWithClientMeterName);
}
// Helper function to register and mark meter if not present
- private void registerAndMarkMeterIfNotPresent(String name) {
- registerMeterIfNotPresent(name);
- markMeterIfPresent(name);
+ private void registerAndMarkMeter(String requestMeter) {
+ if (requestMeter.isEmpty()) {
+ return;
+ }
+ if(!registry.get(requestMeter).isPresent()){
+ metrics.add(requestMeter);
+ }
+ registry.meter(requestMeter).mark();
}
private String opWithClientMeterName(Object op) {
@@ -327,9 +279,14 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
- requestsMap = new ConcurrentHashMap<>();
- clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics");
- regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics");
+ LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
+ @Override public void sweep(String key) {
+ registry.remove(key);
+ metrics.remove(key);
+ }
+ };
+ clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener);
+ regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener);
// only be active mode when this region holds meta table.
active = true;
}
@@ -338,10 +295,8 @@ public class MetaTableMetrics implements RegionCoprocessor {
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
// since meta region can move around, clear stale metrics when stop.
- if (requestsMap != null) {
- for (String meterName : requestsMap.keySet()) {
- registry.remove(meterName);
- }
+ for(String metric:metrics){
+ registry.remove(metric);
}
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index be8b592..ca1a014 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.util;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -47,13 +46,18 @@ import org.slf4j.LoggerFactory;
public class LossyCounting {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
private long bucketSize;
- private long currentTerm;
+ private int currentTerm;
private double errorRate;
private Map<String, Integer> data;
private long totalDataCount;
private String name;
+ private LossyCountingListener listener;
- public LossyCounting(double errorRate, String name) {
+ public interface LossyCountingListener {
+ void sweep(String key);
+ }
+
+ public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
this.errorRate = errorRate;
this.name = name;
if (errorRate < 0.0 || errorRate > 1.0) {
@@ -63,48 +67,55 @@ public class LossyCounting {
this.currentTerm = 1;
this.totalDataCount = 0;
this.data = new ConcurrentHashMap<>();
+ this.listener = listener;
calculateCurrentTerm();
}
- public LossyCounting(String name) {
+ public LossyCounting(String name, LossyCountingListener listener) {
this(HBaseConfiguration.create().getDouble(HConstants.DEFAULT_LOSSY_COUNTING_ERROR_RATE, 0.02),
- name);
+ name, listener);
}
- public Set<String> addByOne(String key) {
- data.put(key, data.getOrDefault(key, 0) + 1);
+ private void addByOne(String key) {
+ //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
+ //we create a new entry starting with currentTerm so that it will not be pruned immediately
+ data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
+
+ //update totalDataCount and term
totalDataCount++;
calculateCurrentTerm();
- Set<String> dataToBeSwept = new HashSet<>();
+ }
+
+ public void add(String key) {
+ addByOne(key);
if(totalDataCount % bucketSize == 0) {
- dataToBeSwept = sweep();
+ //sweep the entries at bucket boundaries
+ sweep();
}
- return dataToBeSwept;
}
+
/**
* sweep low frequency data
* @return Names of elements got swept
*/
- private Set<String> sweep() {
- Set<String> dataToBeSwept = new HashSet<>();
+ private void sweep() {
for(Map.Entry<String, Integer> entry : data.entrySet()) {
- if(entry.getValue() + errorRate < currentTerm) {
- dataToBeSwept.add(entry.getKey());
+ if(entry.getValue() < currentTerm) {
+ String metric = entry.getKey();
+ data.remove(metric);
+ if (listener != null) {
+ listener.sweep(metric);
+ }
}
}
- for(String key : dataToBeSwept) {
- data.remove(key);
- }
- LOG.trace(String.format("%s swept %d elements.", name, dataToBeSwept.size()));
- return dataToBeSwept;
}
/**
* Calculate and set current term
*/
private void calculateCurrentTerm() {
- this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / bucketSize);
+ this.currentTerm = (int) Math.ceil(1.0 * totalDataCount / (double) bucketSize);
}
public long getBucketSize(){
@@ -119,6 +130,10 @@ public class LossyCounting {
return data.containsKey(key);
}
+ public Set<String> getElements(){
+ return data.keySet();
+ }
+
public long getCurrentTerm() {
return currentTerm;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
index 97f24ee..9fe8c12 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestMetaTableMetrics.java
@@ -1,12 +1,20 @@
/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index e4f1939..5240c40 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -38,18 +38,18 @@ public class TestLossyCounting {
@Test
public void testBucketSize() {
- LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize");
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testBucketSize", null);
assertEquals(100L, lossyCounting.getBucketSize());
- LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2");
+ LossyCounting lossyCounting2 = new LossyCounting("testBucketSize2", null);
assertEquals(50L, lossyCounting2.getBucketSize());
}
@Test
public void testAddByOne() {
- LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne");
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testAddByOne", null);
for(int i = 0; i < 100; i++){
String key = "" + i;
- lossyCounting.addByOne(key);
+ lossyCounting.add(key);
}
assertEquals(100L, lossyCounting.getDataSize());
for(int i = 0; i < 100; i++){
@@ -60,26 +60,27 @@ public class TestLossyCounting {
@Test
public void testSweep1() {
- LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1");
+ LossyCounting lossyCounting = new LossyCounting(0.01, "testSweep1", null);
for(int i = 0; i < 400; i++){
String key = "" + i;
- lossyCounting.addByOne(key);
+ lossyCounting.add(key);
}
assertEquals(4L, lossyCounting.getCurrentTerm());
- assertEquals(0L, lossyCounting.getDataSize());
+ //if total rows added are proportional to bucket size
+ assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
}
@Test
public void testSweep2() {
- LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2");
+ LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
for(int i = 0; i < 10; i++){
String key = "" + i;
- lossyCounting.addByOne(key);
+ lossyCounting.add(key);
}
assertEquals(10L, lossyCounting.getDataSize());
for(int i = 0; i < 10; i++){
String key = "1";
- lossyCounting.addByOne(key);
+ lossyCounting.add(key);
}
assertEquals(1L, lossyCounting.getDataSize());
}