You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by el...@apache.org on 2019/10/23 15:54:56 UTC

[hbase] branch branch-2 updated: HBASE-15519 Add per-user metrics with lossy counting

This is an automated email from the ASF dual-hosted git repository.

elserj pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new ddd61aa  HBASE-15519 Add per-user metrics with lossy counting
ddd61aa is described below

commit ddd61aa825a33fd20b8c8b50cce71d05526d3144
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Wed Oct 23 11:36:14 2019 -0400

    HBASE-15519 Add per-user metrics with lossy counting
    
    Introducing property hbase.regionserver.user.metrics.enabled(Default:true)
    to disable user metrics in case it accounts for any performance issues
    
    Close #661
    
    Signed-off-by: Josh Elser <el...@apache.org>
---
 .../MetricsRegionServerSourceFactory.java          |  12 ++
 .../regionserver/MetricsUserAggregateSource.java   |  62 ++++++
 .../hbase/regionserver/MetricsUserSource.java      |  45 +++++
 .../MetricsRegionServerSourceFactoryImpl.java      |  28 ++-
 .../MetricsUserAggregateSourceImpl.java            | 112 +++++++++++
 .../hbase/regionserver/MetricsUserSourceImpl.java  | 207 +++++++++++++++++++++
 .../regionserver/TestMetricsUserSourceImpl.java    |  76 ++++++++
 .../hadoop/hbase/coprocessor/MetaTableMetrics.java |  15 +-
 .../hbase/regionserver/MetricsRegionServer.java    |  28 ++-
 .../hbase/regionserver/MetricsUserAggregate.java   |  39 ++++
 .../regionserver/MetricsUserAggregateFactory.java  |  70 +++++++
 .../regionserver/MetricsUserAggregateImpl.java     | 134 +++++++++++++
 .../apache/hadoop/hbase/util/LossyCounting.java    |  60 ++++--
 .../regionserver/TestMetricsTableLatencies.java    |   1 +
 .../regionserver/TestMetricsUserAggregate.java     | 152 +++++++++++++++
 .../hadoop/hbase/util/TestLossyCounting.java       |  20 +-
 16 files changed, 1022 insertions(+), 39 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
index bf4e0bc..ef33909 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactory.java
@@ -45,6 +45,18 @@ public interface MetricsRegionServerSourceFactory {
   MetricsRegionSource createRegion(MetricsRegionWrapper wrapper);
 
   /**
+   * Create a MetricsUserSource from a user
+   * @return A metrics user source
+   */
+  MetricsUserSource createUser(String shortUserName);
+
+  /**
+   * Return the singleton instance for MetricsUserAggregateSource
+   * @return A metrics user aggregate source
+   */
+  MetricsUserAggregateSource getUserAggregate();
+
+  /**
    * Create a MetricsTableSource from a MetricsTableWrapper.
    *
    * @param table The table name
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
new file mode 100644
index 0000000..0ffb928
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+* This interface will be implemented by a MetricsSource that will export metrics from
+* multiple users into the hadoop metrics system.
+*/
+@InterfaceAudience.Private
+public interface MetricsUserAggregateSource extends BaseSource {
+
+  /**
+   * The name of the metrics
+   */
+  static final String METRICS_NAME = "Users";
+
+  /**
+   * The name of the metrics context that metrics will be under.
+   */
+  static final String METRICS_CONTEXT = "regionserver";
+
+  /**
+   * Description
+   */
+  static final String METRICS_DESCRIPTION = "Metrics about users connected to the regionserver";
+
+  /**
+   * The name of the metrics context that metrics will be under in jmx
+   */
+  static final String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+  static final String NUM_USERS = "numUsers";
+  static final String NUMBER_OF_USERS_DESC = "Number of users in the metrics system";
+
+  /**
+   * Returns a MetricsUserSource if already exists, or creates and registers one for this user
+   * @param user the user name
+   * @return a metrics user source
+   */
+  MetricsUserSource getOrCreateMetricsUser(String user);
+
+  void deregister(MetricsUserSource toRemove);
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
new file mode 100644
index 0000000..b20dca6
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsUserSource extends Comparable<MetricsUserSource> {
+
+  String getUser();
+
+  void register();
+
+  void deregister();
+
+  void updatePut(long t);
+
+  void updateDelete(long t);
+
+  void updateGet(long t);
+
+  void updateIncrement(long t);
+
+  void updateAppend(long t);
+
+  void updateReplay(long t);
+
+  void updateScanTime(long t);
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
index 2e6b458..ccc1749 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerSourceFactoryImpl.java
@@ -30,17 +30,27 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
   public static enum FactoryStorage {
     INSTANCE;
     private Object aggLock = new Object();
-    private MetricsRegionAggregateSourceImpl aggImpl;
+    private MetricsRegionAggregateSourceImpl regionAggImpl;
+    private MetricsUserAggregateSourceImpl userAggImpl;
     private MetricsTableAggregateSourceImpl tblAggImpl;
     private MetricsHeapMemoryManagerSourceImpl heapMemMngImpl;
   }
 
-  private synchronized MetricsRegionAggregateSourceImpl getAggregate() {
+  private synchronized MetricsRegionAggregateSourceImpl getRegionAggregate() {
     synchronized (FactoryStorage.INSTANCE.aggLock) {
-      if (FactoryStorage.INSTANCE.aggImpl == null) {
-        FactoryStorage.INSTANCE.aggImpl = new MetricsRegionAggregateSourceImpl();
+      if (FactoryStorage.INSTANCE.regionAggImpl == null) {
+        FactoryStorage.INSTANCE.regionAggImpl = new MetricsRegionAggregateSourceImpl();
       }
-      return FactoryStorage.INSTANCE.aggImpl;
+      return FactoryStorage.INSTANCE.regionAggImpl;
+    }
+  }
+
+  public synchronized MetricsUserAggregateSourceImpl getUserAggregate() {
+    synchronized (FactoryStorage.INSTANCE.aggLock) {
+      if (FactoryStorage.INSTANCE.userAggImpl == null) {
+        FactoryStorage.INSTANCE.userAggImpl = new MetricsUserAggregateSourceImpl();
+      }
+      return FactoryStorage.INSTANCE.userAggImpl;
     }
   }
 
@@ -72,7 +82,7 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
 
   @Override
   public MetricsRegionSource createRegion(MetricsRegionWrapper wrapper) {
-    return new MetricsRegionSourceImpl(wrapper, getAggregate());
+    return new MetricsRegionSourceImpl(wrapper, getRegionAggregate());
   }
 
   @Override
@@ -83,4 +93,10 @@ public class MetricsRegionServerSourceFactoryImpl implements MetricsRegionServer
   public MetricsIOSource createIO(MetricsIOWrapper wrapper) {
     return new MetricsIOSourceImpl(wrapper);
   }
+
+  @Override
+  public org.apache.hadoop.hbase.regionserver.MetricsUserSource createUser(String shortUserName) {
+    return new org.apache.hadoop.hbase.regionserver.MetricsUserSourceImpl(shortUserName,
+        getUserAggregate());
+  }
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
new file mode 100644
index 0000000..c447f40
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.MetricsCollector;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.Interns;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
+  implements MetricsUserAggregateSource {
+
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsUserAggregateSourceImpl.class);
+
+  private final ConcurrentHashMap<String, MetricsUserSource> userSources =
+      new ConcurrentHashMap<String, MetricsUserSource>();
+
+  public MetricsUserAggregateSourceImpl() {
+    this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+  }
+
+  public MetricsUserAggregateSourceImpl(String metricsName,
+      String metricsDescription,
+      String metricsContext,
+      String metricsJmxContext) {
+    super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+  }
+
+  @Override
+  public MetricsUserSource getOrCreateMetricsUser(String user) {
+    MetricsUserSource source = userSources.get(user);
+    if (source != null) {
+      return source;
+    }
+    source = new MetricsUserSourceImpl(user, this);
+    MetricsUserSource prev = userSources.putIfAbsent(user, source);
+
+    if (prev != null) {
+      return prev;
+    } else {
+      // register the new metrics now
+      register(source);
+    }
+    return source;
+  }
+
+  public void register(MetricsUserSource source) {
+    synchronized (this) {
+      source.register();
+    }
+  }
+
+  @Override
+  public void deregister(MetricsUserSource toRemove) {
+    try {
+      synchronized (this) {
+        MetricsUserSource source = userSources.remove(toRemove.getUser());
+        if (source != null) {
+          source.deregister();
+        }
+      }
+    } catch (Exception e) {
+      // Ignored. If this errors out it means that someone is double
+      // closing the user source and the user metrics is already nulled out.
+      LOG.info("Error trying to remove " + toRemove + " from " + getClass().getSimpleName(), e);
+    }
+  }
+
+  @VisibleForTesting
+  public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
+    return userSources;
+  }
+
+  @Override
+  public void getMetrics(MetricsCollector collector, boolean all) {
+    MetricsRecordBuilder mrb = collector.addRecord(metricsName);
+
+    if (userSources != null) {
+      for (MetricsUserSource userMetricSource : userSources.values()) {
+        if (userMetricSource instanceof MetricsUserSourceImpl) {
+          ((MetricsUserSourceImpl) userMetricSource).snapshot(mrb, all);
+        }
+      }
+      mrb.addGauge(Interns.info(NUM_USERS, NUMBER_OF_USERS_DESC), userSources.size());
+      metricsRegistry.snapshot(mrb, all);
+    }
+  }
+}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
new file mode 100644
index 0000000..9f714a3
--- /dev/null
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public class MetricsUserSourceImpl implements MetricsUserSource {
+  private static final Logger LOG = LoggerFactory.getLogger(MetricsUserSourceImpl.class);
+
+  private final String userNamePrefix;
+
+  private final String user;
+
+  private final String userGetKey;
+  private final String userScanTimeKey;
+  private final String userPutKey;
+  private final String userDeleteKey;
+  private final String userIncrementKey;
+  private final String userAppendKey;
+  private final String userReplayKey;
+
+  private MetricHistogram getHisto;
+  private MetricHistogram scanTimeHisto;
+  private MetricHistogram putHisto;
+  private MetricHistogram deleteHisto;
+  private MetricHistogram incrementHisto;
+  private MetricHistogram appendHisto;
+  private MetricHistogram replayHisto;
+
+  private final int hashCode;
+
+  private AtomicBoolean closed = new AtomicBoolean(false);
+  private final MetricsUserAggregateSourceImpl agg;
+  private final DynamicMetricsRegistry registry;
+
+  public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
+    }
+
+    this.user = user;
+    this.agg = agg;
+    this.registry = agg.getMetricsRegistry();
+
+    this.userNamePrefix = "user_" + user + "_metric_";
+
+    hashCode = userNamePrefix.hashCode();
+
+    userGetKey = userNamePrefix + MetricsRegionServerSource.GET_KEY;
+    userScanTimeKey = userNamePrefix + MetricsRegionServerSource.SCAN_TIME_KEY;
+    userPutKey = userNamePrefix + MetricsRegionServerSource.PUT_KEY;
+    userDeleteKey = userNamePrefix + MetricsRegionServerSource.DELETE_KEY;
+    userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
+    userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
+    userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
+
+    agg.register(this);
+  }
+
+  @Override
+  public void register() {
+    synchronized (this) {
+      getHisto = registry.newTimeHistogram(userGetKey);
+      scanTimeHisto = registry.newTimeHistogram(userScanTimeKey);
+      putHisto = registry.newTimeHistogram(userPutKey);
+      deleteHisto = registry.newTimeHistogram(userDeleteKey);
+      incrementHisto = registry.newTimeHistogram(userIncrementKey);
+      appendHisto = registry.newTimeHistogram(userAppendKey);
+      replayHisto = registry.newTimeHistogram(userReplayKey);
+    }
+  }
+
+  @Override
+  public void deregister() {
+    boolean wasClosed = closed.getAndSet(true);
+
+    // Has someone else already closed this for us?
+    if (wasClosed) {
+      return;
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Removing user Metrics for user: " + user);
+    }
+
+    synchronized (this) {
+      registry.removeMetric(userGetKey);
+      registry.removeMetric(userScanTimeKey);
+      registry.removeMetric(userPutKey);
+      registry.removeMetric(userDeleteKey);
+      registry.removeMetric(userIncrementKey);
+      registry.removeMetric(userAppendKey);
+      registry.removeMetric(userReplayKey);
+    }
+  }
+
+  @Override
+  public String getUser() {
+    return user;
+  }
+
+  @Override
+  public int compareTo(MetricsUserSource source) {
+    if (source == null) {
+      return -1;
+    }
+    if (!(source instanceof MetricsUserSourceImpl)) {
+      return -1;
+    }
+
+    MetricsUserSourceImpl impl = (MetricsUserSourceImpl) source;
+
+    return Long.compare(hashCode, impl.hashCode);
+  }
+
+  @Override
+  public int hashCode() {
+    return hashCode;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return obj == this ||
+        (obj instanceof MetricsUserSourceImpl && compareTo((MetricsUserSourceImpl) obj) == 0);
+  }
+
+  void snapshot(MetricsRecordBuilder mrb, boolean ignored) {
+    // If there is a close that started be double extra sure
+    // that we're not getting any locks and not putting data
+    // into the metrics that should be removed. So early out
+    // before even getting the lock.
+    if (closed.get()) {
+      return;
+    }
+
+    // Grab the read
+    // This ensures that removes of the metrics
+    // can't happen while we are putting them back in.
+    synchronized (this) {
+
+      // It's possible that a close happened between checking
+      // the closed variable and getting the lock.
+      if (closed.get()) {
+        return;
+      }
+    }
+  }
+
+  @Override
+  public void updatePut(long t) {
+    putHisto.add(t);
+  }
+
+  @Override
+  public void updateDelete(long t) {
+    deleteHisto.add(t);
+  }
+
+  @Override
+  public void updateGet(long t) {
+    getHisto.add(t);
+  }
+
+  @Override
+  public void updateIncrement(long t) {
+    incrementHisto.add(t);
+  }
+
+  @Override
+  public void updateAppend(long t) {
+    appendHisto.add(t);
+  }
+
+  @Override
+  public void updateReplay(long t) {
+    replayHisto.add(t);
+  }
+
+  @Override
+  public void updateScanTime(long t) {
+    scanTimeHisto.add(t);
+  }
+}
diff --git a/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
new file mode 100644
index 0000000..f2c7a21
--- /dev/null
+++ b/hbase-hadoop2-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserSourceImpl.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.MetricsTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MetricsTests.class, SmallTests.class})
+public class TestMetricsUserSourceImpl {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMetricsUserSourceImpl.class);
+
+  @Test
+  public void testCompareToHashCodeEquals() throws Exception {
+    MetricsRegionServerSourceFactory fact
+      = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+    MetricsUserSource one = fact.createUser("ONE");
+    MetricsUserSource oneClone = fact.createUser("ONE");
+    MetricsUserSource two = fact.createUser("TWO");
+
+    assertEquals(0, one.compareTo(oneClone));
+    assertEquals(one.hashCode(), oneClone.hashCode());
+    assertNotEquals(one, two);
+
+    assertTrue(one.compareTo(two) != 0);
+    assertTrue(two.compareTo(one) != 0);
+    assertTrue(two.compareTo(one) != one.compareTo(two));
+    assertTrue(two.compareTo(two) == 0);
+  }
+
+
+  @Test (expected = RuntimeException.class)
+  public void testNoGetRegionServerMetricsSourceImpl() throws Exception {
+    // This should throw an exception because MetricsUserSourceImpl should only
+    // be created by a factory.
+    CompatibilitySingletonFactory.getInstance(MetricsUserSource.class);
+  }
+
+  @Test
+  public void testGetUser() {
+    MetricsRegionServerSourceFactory fact
+      = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class);
+
+    MetricsUserSource one = fact.createUser("ONE");
+    assertEquals("ONE", one.getUser());
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
index 70e8df1..f9f6d67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/MetaTableMetrics.java
@@ -279,14 +279,13 @@ public class MetaTableMetrics implements RegionCoprocessor {
           .equals(TableName.META_TABLE_NAME)) {
       RegionCoprocessorEnvironment regionCoprocessorEnv = (RegionCoprocessorEnvironment) env;
       registry = regionCoprocessorEnv.getMetricRegistryForRegionServer();
-      LossyCounting.LossyCountingListener listener = new LossyCounting.LossyCountingListener(){
-        @Override public void sweep(String key) {
-          registry.remove(key);
-          metrics.remove(key);
-        }
-      };
-      clientMetricsLossyCounting = new LossyCounting("clientMetaMetrics",listener);
-      regionMetricsLossyCounting = new LossyCounting("regionMetaMetrics",listener);
+      LossyCounting.LossyCountingListener listener =
+          (LossyCounting.LossyCountingListener<String>) key -> {
+            registry.remove(key);
+            metrics.remove(key);
+          };
+      clientMetricsLossyCounting = new LossyCounting<String>("clientMetaMetrics",listener);
+      regionMetricsLossyCounting = new LossyCounting<String>("regionMetaMetrics",listener);
       // only be active mode when this region holds meta table.
       active = true;
     }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index f3f68f2..9bf2b32 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -44,10 +44,11 @@ public class MetricsRegionServer {
       "hbase.regionserver.enable.table.latencies";
   public static final boolean RS_ENABLE_TABLE_METRICS_DEFAULT = true;
 
-  private MetricsRegionServerSource serverSource;
-  private MetricsRegionServerWrapper regionServerWrapper;
+  private final MetricsRegionServerSource serverSource;
+  private final MetricsRegionServerWrapper regionServerWrapper;
   private RegionServerTableMetrics tableMetrics;
   private final MetricsTable metricsTable;
+  private final MetricsUserAggregate userAggregate;
 
   private MetricRegistry metricRegistry;
   private Timer bulkLoadTimer;
@@ -58,9 +59,8 @@ public class MetricsRegionServer {
       MetricsTable metricsTable) {
     this(regionServerWrapper,
         CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
-            .createServer(regionServerWrapper),
-        createTableMetrics(conf),
-        metricsTable);
+            .createServer(regionServerWrapper), createTableMetrics(conf), metricsTable,
+        MetricsUserAggregateFactory.getMetricsUserAggregate(conf));
 
     // Create hbase-metrics module based metrics. The registry should already be registered by the
     // MetricsRegionServerSource
@@ -74,13 +74,13 @@ public class MetricsRegionServer {
   }
 
   MetricsRegionServer(MetricsRegionServerWrapper regionServerWrapper,
-                      MetricsRegionServerSource serverSource,
-                      RegionServerTableMetrics tableMetrics,
-                      MetricsTable metricsTable) {
+      MetricsRegionServerSource serverSource, RegionServerTableMetrics tableMetrics,
+      MetricsTable metricsTable, MetricsUserAggregate userAggregate) {
     this.regionServerWrapper = regionServerWrapper;
     this.serverSource = serverSource;
     this.tableMetrics = tableMetrics;
     this.metricsTable = metricsTable;
+    this.userAggregate = userAggregate;
   }
 
   /**
@@ -98,6 +98,11 @@ public class MetricsRegionServer {
     return serverSource;
   }
 
+  @VisibleForTesting
+  public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
+    return userAggregate;
+  }
+
   public MetricsRegionServerWrapper getRegionServerWrapper() {
     return regionServerWrapper;
   }
@@ -117,6 +122,7 @@ public class MetricsRegionServer {
       tableMetrics.updatePut(tn, t);
     }
     serverSource.updatePut(t);
+    userAggregate.updatePut(t);
   }
 
   public void updateDelete(TableName tn, long t) {
@@ -124,6 +130,7 @@ public class MetricsRegionServer {
       tableMetrics.updateDelete(tn, t);
     }
     serverSource.updateDelete(t);
+    userAggregate.updateDelete(t);
   }
 
   public void updateDeleteBatch(TableName tn, long t) {
@@ -152,6 +159,7 @@ public class MetricsRegionServer {
       serverSource.incrSlowGet();
     }
     serverSource.updateGet(t);
+    userAggregate.updateGet(t);
   }
 
   public void updateIncrement(TableName tn, long t) {
@@ -162,6 +170,7 @@ public class MetricsRegionServer {
       serverSource.incrSlowIncrement();
     }
     serverSource.updateIncrement(t);
+    userAggregate.updateIncrement(t);
   }
 
   public void updateAppend(TableName tn, long t) {
@@ -172,10 +181,12 @@ public class MetricsRegionServer {
       serverSource.incrSlowAppend();
     }
     serverSource.updateAppend(t);
+    userAggregate.updateAppend(t);
   }
 
   public void updateReplay(long t){
     serverSource.updateReplay(t);
+    userAggregate.updateReplay(t);
   }
 
   public void updateScanSize(TableName tn, long scanSize){
@@ -190,6 +201,7 @@ public class MetricsRegionServer {
       tableMetrics.updateScanTime(tn, t);
     }
     serverSource.updateScanTime(t);
+    userAggregate.updateScanTime(t);
   }
 
   public void updateSplitTime(long t) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
new file mode 100644
index 0000000..9b23ccc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsUserAggregate {
+
+  void updatePut(long t);
+
+  void updateDelete(long t);
+
+  void updateGet(long t);
+
+  void updateIncrement(long t);
+
+  void updateAppend(long t);
+
+  void updateReplay(long t);
+
+  void updateScanTime(long t);
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
new file mode 100644
index 0000000..888c480
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateFactory {
+  private MetricsUserAggregateFactory() {
+
+  }
+  public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
+  public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
+
+  public static MetricsUserAggregate getMetricsUserAggregate(Configuration conf) {
+    if (conf.getBoolean(METRIC_USER_ENABLED_CONF, DEFAULT_METRIC_USER_ENABLED_CONF)) {
+      return new MetricsUserAggregateImpl(conf);
+    } else {
+      //NoOpMetricUserAggregate
+      return new MetricsUserAggregate() {
+        @Override public void updatePut(long t) {
+
+        }
+
+        @Override public void updateDelete(long t) {
+
+        }
+
+        @Override public void updateGet(long t) {
+
+        }
+
+        @Override public void updateIncrement(long t) {
+
+        }
+
+        @Override public void updateAppend(long t) {
+
+        }
+
+        @Override public void updateReplay(long t) {
+
+        }
+
+        @Override public void updateScanTime(long t) {
+
+        }
+      };
+    }
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
new file mode 100644
index 0000000..6c24afc
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
+import org.apache.hadoop.hbase.util.LossyCounting;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+@InterfaceAudience.Private
+public class MetricsUserAggregateImpl implements MetricsUserAggregate{
+
+  /** Provider for mapping principal names to Users */
+  private final UserProvider userProvider;
+
+  private final MetricsUserAggregateSource source;
+  private final LossyCounting userMetricLossyCounting;
+
+  public MetricsUserAggregateImpl(Configuration conf) {
+    source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
+        .getUserAggregate();
+    userMetricLossyCounting = new LossyCounting<MetricsUserSource>("userMetrics",
+        (LossyCounting.LossyCountingListener<MetricsUserSource>) key -> source.deregister(key));
+    this.userProvider = UserProvider.instantiate(conf);
+  }
+
+  /**
+   * Returns the active user to which authorization checks should be applied.
+   * If we are in the context of an RPC call, the remote user is used,
+   * otherwise the currently logged in user is used.
+   */
+  private String getActiveUser() {
+    Optional<User> user = RpcServer.getRequestUser();
+    if (!user.isPresent()) {
+      // for non-rpc handling, fallback to system user
+      try {
+        user = Optional.of(userProvider.getCurrent());
+      } catch (IOException ignore) {
+      }
+    }
+    return user.isPresent() ? user.get().getShortName() : null;
+  }
+
+  @VisibleForTesting
+  MetricsUserAggregateSource getSource() {
+    return source;
+  }
+
+  @Override
+  public void updatePut(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updatePut(t);
+    }
+  }
+
+  @Override
+  public void updateDelete(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateDelete(t);
+    }
+  }
+
+  @Override
+  public void updateGet(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateGet(t);
+    }
+  }
+
+  @Override
+  public void updateIncrement(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateIncrement(t);
+    }
+  }
+
+  @Override
+  public void updateAppend(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateAppend(t);
+    }
+  }
+
+  @Override
+  public void updateReplay(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateReplay(t);
+    }
+  }
+
+  @Override
+  public void updateScanTime(long t) {
+    String user = getActiveUser();
+    if (user != null) {
+      getOrCreateMetricsUser(user).updateScanTime(t);
+    }
+  }
+
+  private MetricsUserSource getOrCreateMetricsUser(String user) {
+    MetricsUserSource userSource = source.getOrCreateMetricsUser(user);
+    userMetricLossyCounting.add(userSource);
+    return userSource;
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
index ca1a014..d730a01 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/LossyCounting.java
@@ -22,6 +22,10 @@ package org.apache.hadoop.hbase.util;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
@@ -29,7 +33,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 
 /**
  * LossyCounting utility, bounded data structure that maintains approximate high frequency
@@ -43,18 +47,20 @@ import org.slf4j.LoggerFactory;
  */
 
 @InterfaceAudience.Private
-public class LossyCounting {
+public class LossyCounting<T> {
   private static final Logger LOG = LoggerFactory.getLogger(LossyCounting.class);
+  private final ExecutorService executor;
   private long bucketSize;
   private int currentTerm;
   private double errorRate;
-  private Map<String, Integer> data;
+  private Map<T, Integer> data;
   private long totalDataCount;
-  private String name;
+  private final String name;
   private LossyCountingListener listener;
+  private static AtomicReference<Future> fut = new AtomicReference<>(null);
 
-  public interface LossyCountingListener {
-    void sweep(String key);
+  public interface LossyCountingListener<T> {
+    void sweep(T key);
   }
 
   public LossyCounting(double errorRate, String name, LossyCountingListener listener) {
@@ -69,6 +75,7 @@ public class LossyCounting {
     this.data = new ConcurrentHashMap<>();
     this.listener = listener;
     calculateCurrentTerm();
+    executor = Executors.newSingleThreadExecutor();
   }
 
   public LossyCounting(String name, LossyCountingListener listener) {
@@ -76,7 +83,7 @@ public class LossyCounting {
         name, listener);
   }
 
-  private void addByOne(String key) {
+  private void addByOne(T key) {
     //If entry exists, we update the entry by incrementing its frequency by one. Otherwise,
     //we create a new entry starting with currentTerm so that it will not be pruned immediately
     data.put(key, data.getOrDefault(key, currentTerm != 0 ? currentTerm - 1 : 0) + 1);
@@ -86,23 +93,29 @@ public class LossyCounting {
     calculateCurrentTerm();
   }
 
-  public void add(String key) {
+  public void add(T key) {
     addByOne(key);
     if(totalDataCount % bucketSize == 0) {
       //sweep the entries at bucket boundaries
-      sweep();
+      //run Sweep
+      Future future = fut.get();
+      if (future != null && !future.isDone()){
+        return;
+      }
+      future = executor.submit(new SweepRunnable());
+      fut.set(future);
     }
   }
 
 
   /**
    * sweep low frequency data
-   * @return Names of elements got swept
    */
-  private void sweep() {
-    for(Map.Entry<String, Integer> entry : data.entrySet()) {
+  @VisibleForTesting
+  public void sweep() {
+    for(Map.Entry<T, Integer> entry : data.entrySet()) {
       if(entry.getValue() < currentTerm) {
-        String metric = entry.getKey();
+        T metric = entry.getKey();
         data.remove(metric);
         if (listener != null) {
           listener.sweep(metric);
@@ -126,16 +139,33 @@ public class LossyCounting {
     return data.size();
   }
 
-  public boolean contains(String key) {
+  public boolean contains(T key) {
     return data.containsKey(key);
   }
 
-  public Set<String> getElements(){
+  public Set<T> getElements(){
     return data.keySet();
   }
 
   public long getCurrentTerm() {
     return currentTerm;
   }
+
+  class SweepRunnable implements Runnable {
+    @Override public void run() {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Starting sweep of lossyCounting-" + name);
+      }
+      try {
+        sweep();
+      } catch (Exception exception) {
+        LOG.debug("Error while sweeping of lossyCounting-{}", name, exception);
+      }
+    }
+  }
+
+  @VisibleForTesting public Future getSweepFuture() {
+    return fut.get();
+  }
 }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
index 43e8d58..f16086e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsTableLatencies.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+
 import org.apache.hadoop.hbase.CompatibilityFactory;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
new file mode 100644
index 0000000..baec144
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsUserAggregate.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.security.PrivilegedAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilityFactory;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.test.MetricsAssertHelper;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({RegionServerTests.class, SmallTests.class})
+public class TestMetricsUserAggregate {
+
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMetricsUserAggregate.class);
+
+  public static MetricsAssertHelper HELPER =
+      CompatibilityFactory.getInstance(MetricsAssertHelper.class);
+
+  private MetricsRegionServerWrapperStub wrapper;
+  private MetricsRegionServer rsm;
+  private MetricsUserAggregateImpl userAgg;
+  private TableName tableName = TableName.valueOf("testUserAggregateMetrics");
+
+  @BeforeClass
+  public static void classSetUp() {
+    HELPER.init();
+  }
+
+  @Before
+  public void setUp() {
+    wrapper = new MetricsRegionServerWrapperStub();
+    Configuration conf = HBaseConfiguration.create();
+    rsm = new MetricsRegionServer(wrapper,conf , null);
+    userAgg = (MetricsUserAggregateImpl)rsm.getMetricsUserAggregate();
+  }
+
+  private void doOperations() {
+    for (int i=0; i < 10; i ++) {
+      rsm.updateGet(tableName,10);
+    }
+    for (int i=0; i < 11; i ++) {
+      rsm.updateScanTime(tableName,11);
+    }
+    for (int i=0; i < 12; i ++) {
+      rsm.updatePut(tableName,12);
+    }
+    for (int i=0; i < 13; i ++) {
+      rsm.updateDelete(tableName,13);
+    }
+    for (int i=0; i < 14; i ++) {
+      rsm.updateIncrement(tableName,14);
+    }
+    for (int i=0; i < 15; i ++) {
+      rsm.updateAppend(tableName,15);
+    }
+    for (int i=0; i < 16; i ++) {
+      rsm.updateReplay(16);
+    }
+  }
+
+  @Test
+  public void testPerUserOperations() {
+    Configuration conf = HBaseConfiguration.create();
+    User userFoo = User.createUserForTesting(conf, "FOO", new String[0]);
+    User userBar = User.createUserForTesting(conf, "BAR", new String[0]);
+
+    userFoo.getUGI().doAs(new PrivilegedAction<Void>() {
+      @Override
+      public Void run() {
+        doOperations();
+        return null;
+      }
+    });
+
+    userBar.getUGI().doAs(new PrivilegedAction<Void>() {
+      @Override
+      public Void run() {
+        doOperations();
+        return null;
+      }
+    });
+
+    HELPER.assertCounter("userfoometricgetnumops", 10, userAgg.getSource());
+    HELPER.assertCounter("userfoometricscantimenumops", 11, userAgg.getSource());
+    HELPER.assertCounter("userfoometricputnumops", 12, userAgg.getSource());
+    HELPER.assertCounter("userfoometricdeletenumops", 13, userAgg.getSource());
+    HELPER.assertCounter("userfoometricincrementnumops", 14, userAgg.getSource());
+    HELPER.assertCounter("userfoometricappendnumops", 15, userAgg.getSource());
+    HELPER.assertCounter("userfoometricreplaynumops", 16, userAgg.getSource());
+
+    HELPER.assertCounter("userbarmetricgetnumops", 10, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricscantimenumops", 11, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricputnumops", 12, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricdeletenumops", 13, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricincrementnumops", 14, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricappendnumops", 15, userAgg.getSource());
+    HELPER.assertCounter("userbarmetricreplaynumops", 16, userAgg.getSource());
+  }
+
+  @Test public void testLossyCountingOfUserMetrics() {
+    Configuration conf = HBaseConfiguration.create();
+    int noOfUsers = 10000;
+    for (int i = 1; i <= noOfUsers; i++) {
+      User.createUserForTesting(conf, "FOO" + i, new String[0]).getUGI()
+          .doAs(new PrivilegedAction<Void>() {
+            @Override public Void run() {
+              rsm.updateGet(tableName, 10);
+              return null;
+            }
+          });
+    }
+    assertTrue(
+        ((MetricsUserAggregateSourceImpl) userAgg.getSource()).getUserSources().size() <= (noOfUsers
+            / 10));
+    for (int i = 1; i <= noOfUsers / 10; i++) {
+      assertFalse(
+          HELPER.checkCounterExists("userfoo" + i + "metricgetnumops", userAgg.getSource()));
+    }
+    HELPER.assertCounter("userfoo" + noOfUsers + "metricgetnumops", 1, userAgg.getSource());
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
index 5240c40..050d2e5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestLossyCounting.java
@@ -24,7 +24,6 @@ import static org.junit.Assert.assertEquals;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.testclassification.MiscTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -66,10 +65,25 @@ public class TestLossyCounting {
       lossyCounting.add(key);
     }
     assertEquals(4L, lossyCounting.getCurrentTerm());
-    //if total rows added are proportional to bucket size
+    waitForSweep(lossyCounting);
+
+    //Do last one sweep as some sweep will be skipped when first one was running
+    lossyCounting.sweep();
     assertEquals(lossyCounting.getBucketSize() - 1, lossyCounting.getDataSize());
   }
 
+  private void waitForSweep(LossyCounting<Object> lossyCounting) {
+    //wait for sweep thread to complete
+    int retry = 0;
+    while (!lossyCounting.getSweepFuture().isDone() && retry < 10) {
+      try {
+        Thread.sleep(100);
+      } catch (InterruptedException e) {
+      }
+      retry++;
+    }
+  }
+
   @Test
   public void testSweep2() {
     LossyCounting lossyCounting = new LossyCounting(0.1, "testSweep2", null);
@@ -77,11 +91,13 @@ public class TestLossyCounting {
       String key = "" + i;
       lossyCounting.add(key);
     }
+    waitForSweep(lossyCounting);
     assertEquals(10L, lossyCounting.getDataSize());
     for(int i = 0; i < 10; i++){
       String key = "1";
       lossyCounting.add(key);
     }
+    waitForSweep(lossyCounting);
     assertEquals(1L, lossyCounting.getDataSize());
   }