You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/10/23 15:54:56 UTC
[hbase] branch branch-2 updated: HBASE-15519 Add per-user metrics
with lossy counting
This is an automated email from the ASF dual-hosted git repository.
elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ddd61aa HBASE-15519 Add per-user metrics with lossy counting
ddd61aa is described below
commit ddd61aa825a33fd20b8c8b50cce71d05526d3144
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Wed Oct 23 11:36:14 2019 -0400
HBASE-15519 Add per-user metrics with lossy counting
Introducing property hbase.regionserver.user.metrics.enabled(Default:true)
to disable user metrics in case it accounts for any performance issues
Close #661
Signed-off-by: Josh Elser <el...@apache.org>
---
.../MetricsRegionServerSourceFactory.java | 12 ++
.../regionserver/MetricsUserAggregateSource.java | 62 ++++++
.../hbase/regionserver/MetricsUserSource.java | 45 +++++
.../MetricsRegionServerSourceFactoryImpl.java | 28 ++-
.../MetricsUserAggregateSourceImpl.java | 112 +++++++++++
.../hbase/regionserver/MetricsUserSourceImpl.java | 207 +++++++++++++++++++++
.../regionserver/TestMetricsUserSourceImpl.java | 76 ++++++++
.../hadoop/hbase/coprocessor/MetaTableMetrics.java | 15 +-
.../hbase/regionserver/MetricsRegionServer.java | 28 ++-
.../hbase/regionserver/MetricsUserAggregate.java | 39 ++++
.../regionserver/MetricsUserAggregateFactory.java | 70 +++++++
.../regionserver/MetricsUserAggregateImpl.java | 134 +++++++++++++
.../apache/hadoop/hbase/util/LossyCounting.java | 60 ++++--
.../regionserver/TestMetricsTableLatencies.java | 1 +
.../regionserver/TestMetricsUserAggregate.java | 152 +++++++++++++++
.../hadoop/hbase/util/TestLossyCounting.java | 20 +-
16 files changed, 1022 insertions(+), 39 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
index bf4e0bc..ef33909 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
@@ -45,6 +45,18 @@ public interface MetricsRegionServerSourceFactory {
MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
/**
+ * Create a MetricsUserSource from a user
+ * @return A metrics user source
+ */
+ MetricsUserSource createUser(String shortUserName);
+
+ /**
+ * Return the singleton instance for MetricsUserAggregateSource
+ * @return A metrics user aggregate source
+ */
+ MetricsUserAggregateSource getUserAggregate();
+
+ /**
* Create a MetricsTableSource from a MetricsTableWrapper.
*
* @param table The table name
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
new file mode 100644
index 0000000..0ffb928
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+* This interface will be implemented by a MetricsSource that will export metrics from
+* multiple users into the hadoop metrics system.
+*/
+@InterfaceAudience.Private
+public interface MetricsUserAggregateSource extends BaseSource {
+
+ /**
+ * The name of the metrics
+ */
+ static final String METRICS_NAME = "Users";
+
+ /**
+ * The name of the metrics context that metrics will be under.
+ */
+ static final String METRICS_CONTEXT = "regionserver";
+
+ /**
+ * Description
+ */
+ static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver";
+
+ /**
+ * The name of the metrics context that metrics will be under in jmx
+ */
+ static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ static final String NUM_USERS = "numUsers";
+ static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system";
+
+ /**
+ * Returns a MetricsUserSource if already exists, or creates and registers one for this user
+ * @param user the user name
+ * @return a metrics user source
+ */
+ MetricsUserSource getOrCreateMetricsUser(String user);
+
+ void deregister(MetricsUserSource toRemove);
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
new file mode 100644
index 0000000..b20dca6
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsUserSource extends Comparable<MetricsUserSource> {
+
+ String getUser();
+
+ void register();
+
+ void deregister();
+
+ void updatePut(long t);
+
+ void updateDelete(long t);
+
+ void updateGet(long t);
+
+ void updateIncrement(long t);
+
+ void updateAppend(long t);
+
+ void updateReplay(long t);
+
+ void updateScanTime(long t);
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
index 2e6b458..ccc1749 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
@@ -30,17 +30,27 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public static enum FactoryStorage {
INSTANCE;
private Object aggLock = new Object();
- private MetricsRegionAggregateSourceImpl aggImpl;
+ private MetricsRegionAggregateSourceImpl regionAggImpl;
+ private MetricsUserAggregateSourceImpl userAggImpl;
private MetricsTableAggregateSourceImpl tblAggImpl;
private MetricsHeapMemoryManagerSourceImpl heapMemMngImpl;
}
- private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
+ private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() {
synchronized (FactoryStorage.INSTANCE.aggLock) {
- if (FactoryStorage.INSTANCE.aggImpl == null) {
- FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl();
+ if (FactoryStorage.INSTANCE.regionAggImpl == null) {
+ FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl();
}
- return FactoryStorage.INSTANCE.aggImpl;
+ return FactoryStorage.INSTANCE.regionAggImpl;
+ }
+ }
+
+ public synchronized MetricsUserAggregateSourceImpl getUserAggregate() {
+ synchronized (FactoryStorage.INSTANCE.aggLock) {
+ if (FactoryStorage.INSTANCE.userAggImpl == null) {
+ FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl();
+ }
+ return FactoryStorage.INSTANCE.userAggImpl;
}
}
@@ -72,7 +82,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
@Override
public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
- return new MetricsRegionSourceImpl(wrapper, getAggregate());
+ return new MetricsRegionSourceImpl(wrapper, getRegionAggregate());
}
@Override
@@ -83,4 +93,10 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
public MetricsIOSource createIO(MetricsIOWrapper wrapper) {
return new MetricsIOSourceImpl(wrapper);
}
+
+ @Override
+ public org.apache.hadoop.hbase.regionserver.MetricsUserSource createUser(String shortUserName) {
+ return new org.apache.hadoop.hbase.regionserver.MetricsUserSourceImpl(shortUserName,
+ getUserAggregate());
+ }
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
new file mode 100644
index 0000000..c447f40
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
+ implements MetricsUserAggregateSource {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsUserAggregateSourceImpl.class);
+
+ private final ConcurrentHashMap<String, MetricsUserSource> userSources =
+ new ConcurrentHashMap<String, MetricsUserSource>();
+
+ public MetricsUserAggregateSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsUserAggregateSourceImpl(String metricsName,
+ String metricsDescription,
+ String metricsContext,
+ String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+ }
+
+ @Override
+ public MetricsUserSource getOrCreateMetricsUser(String user) {
+ MetricsUserSource source = userSources.get(user);
+ if (source != null) {
+ return source;
+ }
+ source = new MetricsUserSourceImpl(user, this);
+ MetricsUserSource prev = userSources.putIfAbsent(user, source);
+
+ if (prev != null) {
+ return prev;
+ } else {
+ // register the new metrics now
+ register(source);
+ }
+ return source;
+ }
+
+ public void register(MetricsUserSource source) {
+ synchronized (this) {
+ source.register();
+ }
+ }
+
+ @Override
+ public void deregister(MetricsUserSource toRemove) {
+ try {
+ synchronized (this) {
+ MetricsUserSource source = userSources.remove(toRemove.getUser());
+ if (source != null) {
+ source.deregister();
+ }
+ }
+ } catch (Exception e) {
+ // Ignored. If this errors out it means that someone is double
+ // closing the user source and the user metrics is already nulled out.
+ LOG.info("Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e);
+ }
+ }
+
+ @VisibleForTesting
+ public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
+ return userSources;
+ }
+
+ @Override
+ public void getMetrics(MetricsCollector collector, boolean all) {
+ MetricsRecordBuilder mrb = collector.addRecord(metricsName);
+
+ if (userSources != null) {
+ for (MetricsUserSource userMetricSource : userSources.values()) {
+ if (userMetricSource instanceof MetricsUserSourceImpl) {
+ ((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all);
+ }
+ }
+ mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size());
+ metricsRegistry.snapshot(mrb, all);
+ }
+ }
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
new file mode 100644
index 0000000..9f714a3
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class MetricsUserSourceImpl implements MetricsUserSource {
+ private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class);
+
+ private final String userNamePrefix;
+
+ private final String user;
+
+ private final String userGetKey;
+ private final String userScanTimeKey;
+ private final String userPutKey;
+ private final String userDeleteKey;
+ private final String userIncrementKey;
+ private final String userAppendKey;
+ private final String userReplayKey;
+
+ private MetricHistogram getHisto;
+ private MetricHistogram scanTimeHisto;
+ private MetricHistogram putHisto;
+ private MetricHistogram deleteHisto;
+ private MetricHistogram incrementHisto;
+ private MetricHistogram appendHisto;
+ private MetricHistogram replayHisto;
+
+ private final int hashCode;
+
+ private AtomicBoolean closed = new AtomicBoolean(false);
+ private final MetricsUserAggregateSourceImpl agg;
+ private final DynamicMetricsRegistry registry;
+
+ public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
+ }
+
+ this.user = user;
+ this.agg = agg;
+ this.registry = agg.getMetricsRegistry();
+
+ this.userNamePrefix = "user_" + user + "_metric_";
+
+ hashCode = userNamePrefix.hashCode();
+
+ userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
+ userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
+ userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY;
+ userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
+ userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
+ userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
+ userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
+
+ agg.register(this);
+ }
+
+ @Override
+ public void register() {
+ synchronized (this) {
+ getHisto = registry.newTimeHistogram(userGetKey);
+ scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
+ putHisto = registry.newTimeHistogram(userPutKey);
+ deleteHisto = registry.newTimeHistogram(userDeleteKey);
+ incrementHisto = registry.newTimeHistogram(userIncrementKey);
+ appendHisto = registry.newTimeHistogram(userAppendKey);
+ replayHisto = registry.newTimeHistogram(userReplayKey);
+ }
+ }
+
+ @Override
+ public void deregister() {
+ boolean wasClosed = closed.getAndSet(true);
+
+ // Has someone else already closed this for us?
+ if (wasClosed) {
+ return;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removing user Metrics for user: " + user);
+ }
+
+ synchronized (this) {
+ registry.removeMetric(userGetKey);
+ registry.removeMetric(userScanTimeKey);
+ registry.removeMetric(userPutKey);
+ registry.removeMetric(userDeleteKey);
+ registry.removeMetric(userIncrementKey);
+ registry.removeMetric(userAppendKey);
+ registry.removeMetric(userReplayKey);
+ }
+ }
+
+ @Override
+ public String getUser() {
+ return user;
+ }
+
+ @Override
+ public int compareTo(MetricsUserSource source) {
+ if (source == null) {
+ return -1;
+ }
+ if (!(source instanceof MetricsUserSourceImpl)) {
+ return -1;
+ }
+
+ MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
+
+ return Long.compare(hashCode, impl.hashCode);
+ }
+
+ @Override
+ public int hashCode() {
+ return hashCode;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return obj == this ||
+ (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
+ }
+
+ void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
+ // If there is a close that started be double extra sure
+ // that we're not getting any locks and not putting data
+ // into the metrics that should be removed. So early out
+ // before even getting the lock.
+ if (closed.get()) {
+ return;
+ }
+
+ // Grab the read
+ // This ensures that removes of the metrics
+ // can't happen while we are putting them back in.
+ synchronized (this) {
+
+ // It's possible that a close happened between checking
+ // the closed variable and getting the lock.
+ if (closed.get()) {
+ return;
+ }
+ }
+ }
+
+ @Override
+ public void updatePut(long t) {
+ putHisto.add(t);
+ }
+
+ @Override
+ public void updateDelete(long t) {
+ deleteHisto.add(t);
+ }
+
+ @Override
+ public void updateGet(long t) {
+ getHisto.add(t);
+ }
+
+ @Override
+ public void updateIncrement(long t) {
+ incrementHisto.add(t);
+ }
+
+ @Override
+ public void updateAppend(long t) {
+ appendHisto.add(t);
+ }
+
+ @Override
+ public void updateReplay(long t) {
+ replayHisto.add(t);
+ }
+
+ @Override
+ public void updateScanTime(long t) {
+ scanTimeHisto.add(t);
+ }
+}
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
new file mode 100644
index 0000000..f2c7a21
--- /dev/null
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MetricsTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MetricsTests.class, SmallTests.class})
+public class TestMetricsUserSourceImpl {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMetricsUserSourceImpl.class);
+
+ @Test
+ public void testCompareToHashCodeEquals() throws Exception {
+ MetricsRegionServerSourceFactory fact
+ = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+ MetricsUserSource one = fact.createUser("ONE");
+ MetricsUserSource oneClone = fact.createUser("ONE");
+ MetricsUserSource two = fact.createUser("TWO");
+
+ assertEquals(0, one.compareTo(oneClone));
+ assertEquals(one.hashCode(), oneClone.hashCode());
+ assertNotEquals(one, two);
+
+ assertTrue(one.compareTo(two) != 0);
+ assertTrue(two.compareTo(one) != 0);
+ assertTrue(two.compareTo(one) != one.compareTo(two));
+ assertTrue(two.compareTo(two) == 0);
+ }
+
+
+ @Test (expected = RuntimeException.class)
+ public void testNoGetRegionServerMetricsSourceImpl() throws Exception {
+ // This should throw an exception because MetricsUserSourceImpl should only
+ // be created by a factory.
+ CompatibilitySingletonFactory.getInstance(MetricsUserSource.class);
+ }
+
+ @Test
+ public void testGetUser() {
+ MetricsRegionServerSourceFactory fact
+ = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+ MetricsUserSource one = fact.createUser("ONE");
+ assertEquals("ONE", one.getUser());
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index 70e8df1..f9f6d67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -279,14 +279,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
.equals(TableName.META_TABLE_NAME)) {
RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
- LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
- @Override public void sweep(String key) {
- registry.remove(key);
- metrics.remove(key);
- }
- };
- clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener);
- regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener);
+ LossyCounting.LossyCountingListener listener =
+ (LossyCounting.LossyCountingListener<String>) key -> {
+ registry.remove(key);
+ metrics.remove(key);
+ };
+ clientMetricsLossyCounting = new LossyCounting<String>("clientMetaMetrics",listener);
+ regionMetricsLossyCounting = new LossyCounting<String>("regionMetaMetrics",listener);
// only be active mode when this region holds meta table.
active = true;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index f3f68f2..9bf2b32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -44,10 +44,11 @@ public class MetricsRegionServer {
"hbase.regionserver.enable.table.latencies";
public static final boolean RS_ENABLE_TABLE_METRICS_DEFAULT = true;
- private MetricsRegionServerSource serverSource;
- private MetricsRegionServerWrapper regionServerWrapper;
+ private final MetricsRegionServerSource serverSource;
+ private final MetricsRegionServerWrapper regionServerWrapper;
private RegionServerTableMetrics tableMetrics;
private final MetricsTable metricsTable;
+ private final MetricsUserAggregate userAggregate;
private MetricRegistry metricRegistry;
private Timer bulkLoadTimer;
@@ -58,9 +59,8 @@ public class MetricsRegionServer {
MetricsTable metricsTable) {
this(regionServerWrapper,
CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
- .createServer(regionServerWrapper),
- createTableMetrics(conf),
- metricsTable);
+ .createServer(regionServerWrapper), createTableMetrics(conf), metricsTable,
+ MetricsUserAggregateFactory.getMetricsUserAggregate(conf));
// Create hbase-metrics module based metrics. The registry should already be registered by the
// MetricsRegionServerSource
@@ -74,13 +74,13 @@ public class MetricsRegionServer {
}
MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
- MetricsRegionServerSource serverSource,
- RegionServerTableMetrics tableMetrics,
- MetricsTable metricsTable) {
+ MetricsRegionServerSource serverSource, RegionServerTableMetrics tableMetrics,
+ MetricsTable metricsTable, MetricsUserAggregate userAggregate) {
this.regionServerWrapper = regionServerWrapper;
this.serverSource = serverSource;
this.tableMetrics = tableMetrics;
this.metricsTable = metricsTable;
+ this.userAggregate = userAggregate;
}
/**
@@ -98,6 +98,11 @@ public class MetricsRegionServer {
return serverSource;
}
+ @VisibleForTesting
+ public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
+ return userAggregate;
+ }
+
public MetricsRegionServerWrapper getRegionServerWrapper() {
return regionServerWrapper;
}
@@ -117,6 +122,7 @@ public class MetricsRegionServer {
tableMetrics.updatePut(tn, t);
}
serverSource.updatePut(t);
+ userAggregate.updatePut(t);
}
public void updateDelete(TableName tn, long t) {
@@ -124,6 +130,7 @@ public class MetricsRegionServer {
tableMetrics.updateDelete(tn, t);
}
serverSource.updateDelete(t);
+ userAggregate.updateDelete(t);
}
public void updateDeleteBatch(TableName tn, long t) {
@@ -152,6 +159,7 @@ public class MetricsRegionServer {
serverSource.incrSlowGet();
}
serverSource.updateGet(t);
+ userAggregate.updateGet(t);
}
public void updateIncrement(TableName tn, long t) {
@@ -162,6 +170,7 @@ public class MetricsRegionServer {
serverSource.incrSlowIncrement();
}
serverSource.updateIncrement(t);
+ userAggregate.updateIncrement(t);
}
public void updateAppend(TableName tn, long t) {
@@ -172,10 +181,12 @@ public class MetricsRegionServer {
serverSource.incrSlowAppend();
}
serverSource.updateAppend(t);
+ userAggregate.updateAppend(t);
}
public void updateReplay(long t){
serverSource.updateReplay(t);
+ userAggregate.updateReplay(t);
}
public void updateScanSize(TableName tn, long scanSize){
@@ -190,6 +201,7 @@ public class MetricsRegionServer {
tableMetrics.updateScanTime(tn, t);
}
serverSource.updateScanTime(t);
+ userAggregate.updateScanTime(t);
}
public void updateSplitTime(long t) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
new file mode 100644
index 0000000..9b23ccc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsUserAggregate {
+
+ void updatePut(long t);
+
+ void updateDelete(long t);
+
+ void updateGet(long t);
+
+ void updateIncrement(long t);
+
+ void updateAppend(long t);
+
+ void updateReplay(long t);
+
+ void updateScanTime(long t);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
new file mode 100644
index 0000000..888c480
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateFactory {
+ private MetricsUserAggregateFactory() {
+
+ }
+ public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
+ public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
+
+ public static MetricsUserAggregate getMetricsUserAggregate(Configuration conf) {
+ if (conf.getBoolean(METRIC_USER_ENABLED_CONF, DEFAULT_METRIC_USER_ENABLED_CONF)) {
+ return new MetricsUserAggregateImpl(conf);
+ } else {
+ //NoOpMetricUserAggregate
+ return new MetricsUserAggregate() {
+ @Override public void updatePut(long t) {
+
+ }
+
+ @Override public void updateDelete(long t) {
+
+ }
+
+ @Override public void updateGet(long t) {
+
+ }
+
+ @Override public void updateIncrement(long t) {
+
+ }
+
+ @Override public void updateAppend(long t) {
+
+ }
+
+ @Override public void updateReplay(long t) {
+
+ }
+
+ @Override public void updateScanTime(long t) {
+
+ }
+ };
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
new file mode 100644
index 0000000..6c24afc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.LossyCounting;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateImpl implements MetricsUserAggregate{
+
+ /** Provider for mapping principal names to Users */
+ private final UserProvider userProvider;
+
+ private final MetricsUserAggregateSource source;
+ private final LossyCounting userMetricLossyCounting;
+
+ public MetricsUserAggregateImpl(Configuration conf) {
+ source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+ .getUserAggregate();
+ userMetricLossyCounting = new LossyCounting<MetricsUserSource>("userMetrics",
+ (LossyCounting.LossyCountingListener<MetricsUserSource>) key -> source.deregister(key));
+ this.userProvider = UserProvider.instantiate(conf);
+ }
+
+ /**
+ * Returns the active user to which authorization checks should be applied.
+ * If we are in the context of an RPC call, the remote user is used,
+ * otherwise the currently logged in user is used.
+ */
+ private String getActiveUser() {
+ Optional<User> user = RpcServer.getRequestUser();
+ if (!user.isPresent()) {
+ // for non-rpc handling, fallback to system user
+ try {
+ user = Optional.of(userProvider.getCurrent());
+ } catch (IOException ignore) {
+ }
+ }
+ return user.isPresent() ? user.get().getShortName() : null;
+ }
+
+ @VisibleForTesting
+ MetricsUserAggregateSource getSource() {
+ return source;
+ }
+
+ @Override
+ public void updatePut(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updatePut(t);
+ }
+ }
+
+ @Override
+ public void updateDelete(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateDelete(t);
+ }
+ }
+
+ @Override
+ public void updateGet(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateGet(t);
+ }
+ }
+
+ @Override
+ public void updateIncrement(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateIncrement(t);
+ }
+ }
+
+ @Override
+ public void updateAppend(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateAppend(t);
+ }
+ }
+
+ @Override
+ public void updateReplay(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateReplay(t);
+ }
+ }
+
+ @Override
+ public void updateScanTime(long t) {
+ String user = getActiveUser();
+ if (user != null) {
+ getOrCreateMetricsUser(user).updateScanTime(t);
+ }
+ }
+
+ private MetricsUserSource getOrCreateMetricsUser(String user) {
+ MetricsUserSource userSource = source.getOrCreateMetricsUser(user);
+ userMetricLossyCounting.add(userSource);
+ return userSource;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index ca1a014..d730a01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -22,6 +22,10 @@ package org.apache.hadoop.hbase.util;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
@@ -29,7 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
* LossyCounting utility, bounded data structure that maintains approximate high frequency
@@ -43,18 +47,20 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
-public class LossyCounting {
+public class LossyCounting<T> {
private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
+ private final ExecutorService executor;
private long bucketSize;
private int currentTerm;
private double errorRate;
- private Map<String, Integer> data;
+ private Map<T, Integer> data;
private long totalDataCount;
- private String name;
+ private final String name;
private LossyCountingListener listener;
+ private static AtomicReference<Future> fut = new AtomicReference<>(null);
- public interface LossyCountingListener {
- void sweep(String key);
+ public interface LossyCountingListener<T> {
+ void sweep(T key);
}
public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
@@ -69,6 +75,7 @@ public class LossyCounting {
this.data = new ConcurrentHashMap<>();
this.listener = listener;
calculateCurrentTerm();
+ executor = Executors.newSingleThreadExecutor();
}
public LossyCounting(String name, LossyCountingListener listener) {
@@ -76,7 +83,7 @@ public class LossyCounting {
name, listener);
}
- private void addByOne(String key) {
+ private void addByOne(T key) {
//If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
//we create a new entry starting with currentTerm so that it will not be pruned immediately
data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
@@ -86,23 +93,29 @@ public class LossyCounting {
calculateCurrentTerm();
}
- public void add(String key) {
+ public void add(T key) {
addByOne(key);
if(totalDataCount % bucketSize == 0) {
//sweep the entries at bucket boundaries
- sweep();
+ //run Sweep
+ Future future = fut.get();
+ if (future != null && !future.isDone()){
+ return;
+ }
+ future = executor.submit(new SweepRunnable());
+ fut.set(future);
}
}
/**
* sweep low frequency data
- * @return Names of elements got swept
*/
- private void sweep() {
- for(Map.Entry<String, Integer> entry : data.entrySet()) {
+ @VisibleForTesting
+ public void sweep() {
+ for(Map.Entry<T, Integer> entry : data.entrySet()) {
if(entry.getValue() < currentTerm) {
- String metric = entry.getKey();
+ T metric = entry.getKey();
data.remove(metric);
if (listener != null) {
listener.sweep(metric);
@@ -126,16 +139,33 @@ public class LossyCounting {
return data.size();
}
- public boolean contains(String key) {
+ public boolean contains(T key) {
return data.containsKey(key);
}
- public Set<String> getElements(){
+ public Set<T> getElements(){
return data.keySet();
}
public long getCurrentTerm() {
return currentTerm;
}
+
+ class SweepRunnable implements Runnable {
+ @Override public void run() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Starting sweep of lossyCounting-" + name);
+ }
+ try {
+ sweep();
+ } catch (Exception exception) {
+ LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
+ }
+ }
+ }
+
+ @VisibleForTesting public Future getSweepFuture() {
+ return fut.get();
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
index 43e8d58..f16086e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
+
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
new file mode 100644
index 0000000..baec144
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.security.PrivilegedAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMetricsUserAggregate {
+
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMetricsUserAggregate.class);
+
+ public static MetricsAssertHelper HELPER =
+ CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+ private MetricsRegionServerWrapperStub wrapper;
+ private MetricsRegionServer rsm;
+ private MetricsUserAggregateImpl userAgg;
+ private TableName tableName = TableName.valueOf("testUserAggregateMetrics");
+
+ @BeforeClass
+ public static void classSetUp() {
+ HELPER.init();
+ }
+
+ @Before
+ public void setUp() {
+ wrapper = new MetricsRegionServerWrapperStub();
+ Configuration conf = HBaseConfiguration.create();
+ rsm = new MetricsRegionServer(wrapper,conf , null);
+ userAgg = (MetricsUserAggregateImpl)rsm.getMetricsUserAggregate();
+ }
+
+ private void doOperations() {
+ for (int i=0; i < 10; i ++) {
+ rsm.updateGet(tableName,10);
+ }
+ for (int i=0; i < 11; i ++) {
+ rsm.updateScanTime(tableName,11);
+ }
+ for (int i=0; i < 12; i ++) {
+ rsm.updatePut(tableName,12);
+ }
+ for (int i=0; i < 13; i ++) {
+ rsm.updateDelete(tableName,13);
+ }
+ for (int i=0; i < 14; i ++) {
+ rsm.updateIncrement(tableName,14);
+ }
+ for (int i=0; i < 15; i ++) {
+ rsm.updateAppend(tableName,15);
+ }
+ for (int i=0; i < 16; i ++) {
+ rsm.updateReplay(16);
+ }
+ }
+
+ @Test
+ public void testPerUserOperations() {
+ Configuration conf = HBaseConfiguration.create();
+ User userFoo = User.createUserForTesting(conf, "FOO", new String[0]);
+ User userBar = User.createUserForTesting(conf, "BAR", new String[0]);
+
+ userFoo.getUGI().doAs(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ doOperations();
+ return null;
+ }
+ });
+
+ userBar.getUGI().doAs(new PrivilegedAction<Void>() {
+ @Override
+ public Void run() {
+ doOperations();
+ return null;
+ }
+ });
+
+ HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource());
+ HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource());
+ HELPER.assertCounter("userfoometricputnumops", 12, userAgg.getSource());
+ HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource());
+ HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource());
+ HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource());
+ HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource());
+
+ HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricputnumops", 12, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource());
+ HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource());
+ }
+
+ @Test public void testLossyCountingOfUserMetrics() {
+ Configuration conf = HBaseConfiguration.create();
+ int noOfUsers = 10000;
+ for (int i = 1; i <= noOfUsers; i++) {
+ User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI()
+ .doAs(new PrivilegedAction<Void>() {
+ @Override public Void run() {
+ rsm.updateGet(tableName, 10);
+ return null;
+ }
+ });
+ }
+ assertTrue(
+ ((MetricsUserAggregateSourceImpl) userAgg.getSource()).getUserSources().size() <= (noOfUsers
+ / 10));
+ for (int i = 1; i <= noOfUsers / 10; i++) {
+ assertFalse(
+ HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource()));
+ }
+ HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource());
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index 5240c40..050d2e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
-
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@@ -66,10 +65,25 @@ public class TestLossyCounting {
lossyCounting.add(key);
}
assertEquals(4L, lossyCounting.getCurrentTerm());
- //if total rows added are proportional to bucket size
+ waitForSweep(lossyCounting);
+
+ //Do last one sweep as some sweep will be skipped when first one was running
+ lossyCounting.sweep();
assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
}
+ private void waitForSweep(LossyCounting<Object> lossyCounting) {
+ //wait for sweep thread to complete
+ int retry = 0;
+ while (!lossyCounting.getSweepFuture().isDone() && retry < 10) {
+ try {
+ Thread.sleep(100);
+ } catch (InterruptedException e) {
+ }
+ retry++;
+ }
+ }
+
@Test
public void testSweep2() {
LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
@@ -77,11 +91,13 @@ public class TestLossyCounting {
String key = "" + i;
lossyCounting.add(key);
}
+ waitForSweep(lossyCounting);
assertEquals(10L, lossyCounting.getDataSize());
for(int i = 0; i < 10; i++){
String key = "1";
lossyCounting.add(key);
}
+ waitForSweep(lossyCounting);
assertEquals(1L, lossyCounting.getDataSize());
}