You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2019/12/23 04:31:42 UTC

[hbase] branch branch-2 updated: HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs

This is an automated email from the ASF dual-hosted git repository.

ankit pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6e6c7b3  HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs
6e6c7b3 is described below

commit 6e6c7b3c2d5a88a41ba8f2e369780d2278e361dd
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Sun Nov 17 15:07:52 2019 -0800

    HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs
    
    Signed-off-by: Toshihiro Suzuki <br...@gmail.com>
    Signed-off-by: Josh Elser <el...@apache.org>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
---
 .../java/org/apache/hadoop/hbase/ServerLoad.java   |   4 +
 .../org/apache/hadoop/hbase/ServerMetrics.java     |   5 +
 .../apache/hadoop/hbase/ServerMetricsBuilder.java  |  38 +++--
 .../java/org/apache/hadoop/hbase/UserMetrics.java  |  86 +++++++++++
 .../apache/hadoop/hbase/UserMetricsBuilder.java    | 151 ++++++++++++++++++++
 .../regionserver/MetricsUserAggregateSource.java   |   4 +
 .../hbase/regionserver/MetricsUserSource.java      |  37 +++++
 .../apache/hadoop/metrics2/MetricHistogram.java    |   6 +
 .../MetricsUserAggregateSourceImpl.java            |  10 +-
 .../hbase/regionserver/MetricsUserSourceImpl.java  |  72 +++++++++-
 .../hadoop/metrics2/lib/MutableHistogram.java      |   4 +
 .../hadoop/metrics2/lib/MutableRangeHistogram.java |   6 +-
 .../apache/hadoop/hbase/hbtop/RecordFilter.java    |   4 +
 .../org/apache/hadoop/hbase/hbtop/field/Field.java |   6 +-
 .../hbase/hbtop/mode/ClientModeStrategy.java       | 157 +++++++++++++++++++++
 .../org/apache/hadoop/hbase/hbtop/mode/Mode.java   |  10 +-
 .../hadoop/hbase/hbtop/mode/ModeStrategy.java      |   3 +-
 .../hadoop/hbase/hbtop/mode/ModeStrategyUtils.java |  63 +++++++++
 .../hbase/hbtop/mode/NamespaceModeStrategy.java    |  26 +---
 .../hbase/hbtop/mode/RegionModeStrategy.java       |  28 +++-
 .../hbase/hbtop/mode/RegionServerModeStrategy.java |  28 ++--
 .../hadoop/hbase/hbtop/mode/TableModeStrategy.java |  13 +-
 .../hadoop/hbase/hbtop/mode/UserModeStrategy.java  |  70 +++++++++
 .../hbase/hbtop/screen/top/TopScreenModel.java     |  27 +++-
 .../hbase/hbtop/screen/top/TopScreenPresenter.java |   6 +-
 .../org/apache/hadoop/hbase/hbtop/TestUtils.java   | 101 ++++++++++++-
 .../hadoop/hbase/hbtop/mode/ClientModeTest.java    |  72 ++++++++++
 .../hadoop/hbase/hbtop/mode/ModeTestBase.java      |   6 +-
 .../hadoop/hbase/hbtop/mode/UserModeTest.java      |  70 +++++++++
 .../src/main/protobuf/ClusterStatus.proto          |  28 ++++
 .../src/main/protobuf/ClusterStatus.proto          |  31 ++++
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   8 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  24 +++-
 .../hadoop/hbase/regionserver/MetricsRegion.java   |  11 +-
 .../hbase/regionserver/MetricsRegionServer.java    |   2 +-
 .../hbase/regionserver/MetricsUserAggregate.java   |   9 ++
 .../regionserver/MetricsUserAggregateFactory.java  |  14 +-
 .../regionserver/MetricsUserAggregateImpl.java     |  79 +++++++++--
 .../hadoop/hbase/TestClientClusterMetrics.java     | 142 +++++++++++++++++++
 .../hbase/master/TestRegionsRecoveryChore.java     |   5 +
 .../hbase/regionserver/TestMetricsRegion.java      |   5 +-
 41 files changed, 1364 insertions(+), 107 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
index 15c8e63..b22d6c4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
@@ -412,6 +412,10 @@ public class ServerLoad implements ServerMetrics {
     return metrics.getRegionMetrics();
   }
 
+  @Override public Map<byte[], UserMetrics> getUserMetrics() {
+    return metrics.getUserMetrics();
+  }
+
   @Override
   public Set<String> getCoprocessorNames() {
     return metrics.getCoprocessorNames();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
index 391e62f..21fad92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
@@ -94,6 +94,11 @@ public interface ServerMetrics {
   Map<byte[], RegionMetrics> getRegionMetrics();
 
   /**
+   * @return metrics per user
+   */
+  Map<byte[], UserMetrics> getUserMetrics();
+
+  /**
    * Return the RegionServer-level and Region-level coprocessors
    * @return string set of loaded RegionServer-level and Region-level coprocessors
    */
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
index a9fa71f..e5cd7b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
@@ -77,6 +77,8 @@ public final class ServerMetricsBuilder {
         .map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
       .setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
         .map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
+        .setUserMetrics(serverLoadPB.getUserLoadsList().stream()
+            .map(UserMetricsBuilder::toUserMetrics).collect(Collectors.toList()))
       .setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
           .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
       .setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
@@ -100,19 +102,19 @@ public final class ServerMetricsBuilder {
         .setInfoServerPort(metrics.getInfoServerPort())
         .setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
         .setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
-        .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
-        .addAllRegionLoads(metrics.getRegionMetrics().values().stream()
-            .map(RegionMetricsBuilder::toRegionLoad)
-            .collect(Collectors.toList()))
-        .addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream()
-            .map(ProtobufUtil::toReplicationLoadSource)
-            .collect(Collectors.toList()))
+        .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads(
+            metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
+                .collect(Collectors.toList())).addAllUserLoads(
+            metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
+                .collect(Collectors.toList())).addAllReplLoadSource(
+            metrics.getReplicationLoadSourceList().stream()
+                .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
         .setReportStartTime(metrics.getLastReportTimestamp())
         .setReportEndTime(metrics.getReportTimestamp());
     if (metrics.getReplicationLoadSink() != null) {
-      builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(
-          metrics.getReplicationLoadSink()));
+      builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
     }
+
     return builder.build();
   }
 
@@ -132,6 +134,7 @@ public final class ServerMetricsBuilder {
   @Nullable
   private ReplicationLoadSink sink = null;
   private final Map<byte[], RegionMetrics> regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+  private final Map<byte[], UserMetrics> userMetrics = new TreeMap<>(Bytes.BYTES_COMPARATOR);
   private final Set<String> coprocessorNames = new TreeSet<>();
   private long reportTimestamp = System.currentTimeMillis();
   private long lastReportTimestamp = 0;
@@ -189,6 +192,11 @@ public final class ServerMetricsBuilder {
     return this;
   }
 
+  public ServerMetricsBuilder setUserMetrics(List<UserMetrics> value) {
+    value.forEach(v -> this.userMetrics.put(v.getUserName(), v));
+    return this;
+  }
+
   public ServerMetricsBuilder setCoprocessorNames(List<String> value) {
     coprocessorNames.addAll(value);
     return this;
@@ -219,7 +227,8 @@ public final class ServerMetricsBuilder {
         regionStatus,
         coprocessorNames,
         reportTimestamp,
-        lastReportTimestamp);
+        lastReportTimestamp,
+        userMetrics);
   }
 
   private static class ServerMetricsImpl implements ServerMetrics {
@@ -238,12 +247,13 @@ public final class ServerMetricsBuilder {
     private final Set<String> coprocessorNames;
     private final long reportTimestamp;
     private final long lastReportTimestamp;
+    private final Map<byte[], UserMetrics> userMetrics;
 
     ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
         long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
         int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
         Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
-        long lastReportTimestamp) {
+        long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) {
       this.serverName = Preconditions.checkNotNull(serverName);
       this.versionNumber = versionNumber;
       this.version = version;
@@ -255,6 +265,7 @@ public final class ServerMetricsBuilder {
       this.sources = Preconditions.checkNotNull(sources);
       this.sink = sink;
       this.regionStatus = Preconditions.checkNotNull(regionStatus);
+      this.userMetrics = Preconditions.checkNotNull(userMetrics);
       this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
       this.reportTimestamp = reportTimestamp;
       this.lastReportTimestamp = lastReportTimestamp;
@@ -325,6 +336,11 @@ public final class ServerMetricsBuilder {
     }
 
     @Override
+    public Map<byte[], UserMetrics> getUserMetrics() {
+      return Collections.unmodifiableMap(userMetrics);
+    }
+
+    @Override
     public Set<String> getCoprocessorNames() {
       return Collections.unmodifiableSet(coprocessorNames);
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java
new file mode 100644
index 0000000..6c2ba07
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+  * Encapsulates per-user load metrics.
+  */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface UserMetrics {
+
+  interface ClientMetrics {
+
+    String getHostName();
+
+    long getReadRequestsCount();
+
+    long getWriteRequestsCount();
+
+    long getFilteredReadRequestsCount();
+  }
+
+  /**
+   * @return the user name
+   */
+  byte[] getUserName();
+
+  /**
+   * @return the number of read requests made by user
+   */
+  long getReadRequestCount();
+
+  /**
+   * @return the number of write requests made by user
+   */
+  long getWriteRequestCount();
+
+  /**
+   * @return the number of write requests and read requests and coprocessor
+   *         service requests made by the user
+   */
+  default long getRequestCount() {
+    return getReadRequestCount() + getWriteRequestCount();
+  }
+
+  /**
+   * @return the user name as a string
+   */
+  default String getNameAsString() {
+    return Bytes.toStringBinary(getUserName());
+  }
+
+  /**
+   * @return metrics per client(hostname)
+   */
+  Map<String, ClientMetrics> getClientMetrics();
+
+  /**
+   * @return count of filtered read requests for a user
+   */
+  long getFilteredReadRequests();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java
new file mode 100644
index 0000000..70d2888
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+
+@InterfaceAudience.Private
+public final class UserMetricsBuilder {
+
+  public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) {
+    UserMetricsBuilder builder = UserMetricsBuilder.newBuilder(userLoad.getUserName().getBytes());
+    userLoad.getClientMetricsList().stream().map(
+      clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(),
+            clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(),
+            clientMetrics.getFilteredRequestsCount())).forEach(builder::addClientMetris);
+    return builder.build();
+  }
+
+  public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics) {
+    ClusterStatusProtos.UserLoad.Builder builder =
+        ClusterStatusProtos.UserLoad.newBuilder().setUserName(userMetrics.getNameAsString());
+    userMetrics.getClientMetrics().values().stream().map(
+      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
+            .setHostName(clientMetrics.getHostName())
+            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
+            .setReadRequestsCount(clientMetrics.getReadRequestsCount())
+            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build())
+        .forEach(builder::addClientMetrics);
+    return builder.build();
+  }
+
+  public static UserMetricsBuilder newBuilder(byte[] name) {
+    return new UserMetricsBuilder(name);
+  }
+
+
+  private final byte[] name;
+  private Map<String, UserMetrics.ClientMetrics> clientMetricsMap = new HashMap<>();
+  private UserMetricsBuilder(byte[] name) {
+    this.name = name;
+  }
+
+  public UserMetricsBuilder addClientMetris(UserMetrics.ClientMetrics clientMetrics) {
+    clientMetricsMap.put(clientMetrics.getHostName(), clientMetrics);
+    return this;
+  }
+
+  public UserMetrics build() {
+    return new UserMetricsImpl(name, clientMetricsMap);
+  }
+
+  public static class ClientMetricsImpl implements UserMetrics.ClientMetrics {
+    private final long filteredReadRequestsCount;
+    private final String hostName;
+    private final long readRequestCount;
+    private final long writeRequestCount;
+
+    public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
+        long filteredReadRequestsCount) {
+      this.hostName = hostName;
+      this.readRequestCount = readRequest;
+      this.writeRequestCount = writeRequest;
+      this.filteredReadRequestsCount = filteredReadRequestsCount;
+    }
+
+    @Override public String getHostName() {
+      return hostName;
+    }
+
+    @Override public long getReadRequestsCount() {
+      return readRequestCount;
+    }
+
+    @Override public long getWriteRequestsCount() {
+      return writeRequestCount;
+    }
+
+    @Override public long getFilteredReadRequestsCount() {
+      return filteredReadRequestsCount;
+    }
+  }
+
+  private static class UserMetricsImpl implements UserMetrics {
+    private final byte[] name;
+    private final Map<String, ClientMetrics> clientMetricsMap;
+
+    UserMetricsImpl(byte[] name, Map<String, ClientMetrics> clientMetricsMap) {
+      this.name = Preconditions.checkNotNull(name);
+      this.clientMetricsMap = clientMetricsMap;
+    }
+
+    @Override public byte[] getUserName() {
+      return name;
+    }
+
+    @Override public long getReadRequestCount() {
+      return clientMetricsMap.values().stream().map(c -> c.getReadRequestsCount())
+          .reduce(0L, Long::sum);
+    }
+
+    @Override public long getWriteRequestCount() {
+      return clientMetricsMap.values().stream().map(c -> c.getWriteRequestsCount())
+          .reduce(0L, Long::sum);
+    }
+
+    @Override public Map<String, ClientMetrics> getClientMetrics() {
+      return this.clientMetricsMap;
+    }
+
+    @Override public long getFilteredReadRequests() {
+      return clientMetricsMap.values().stream().map(c -> c.getFilteredReadRequestsCount())
+          .reduce(0L, Long::sum);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = Strings
+          .appendKeyValue(new StringBuilder(), "readRequestCount", this.getReadRequestCount());
+      Strings.appendKeyValue(sb, "writeRequestCount", this.getWriteRequestCount());
+      Strings.appendKeyValue(sb, "filteredReadRequestCount", this.getFilteredReadRequests());
+      return sb.toString();
+    }
+  }
+
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
index 0ffb928..ee570f0 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Map;
+
 import org.apache.hadoop.hbase.metrics.BaseSource;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -59,4 +61,6 @@ public interface MetricsUserAggregateSource extends BaseSource {
   MetricsUserSource getOrCreateMetricsUser(String user);
 
   void deregister(MetricsUserSource toRemove);
+
+  Map<String, MetricsUserSource> getUserSources();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
index b20dca6..9617366 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
@@ -18,11 +18,31 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.yetus.audience.InterfaceAudience;
 
 @InterfaceAudience.Private
 public interface MetricsUserSource extends Comparable<MetricsUserSource> {
 
+  //These client metrics will be reported through clusterStatus and hbtop only
+  interface ClientMetrics {
+    void incrementReadRequest();
+
+    void incrementWriteRequest();
+
+    String getHostName();
+
+    long getReadRequestsCount();
+
+    long getWriteRequestsCount();
+
+    void incrementFilteredReadRequests();
+
+    long getFilteredReadRequests();
+  }
+
   String getUser();
 
   void register();
@@ -42,4 +62,21 @@ public interface MetricsUserSource extends Comparable<MetricsUserSource> {
   void updateReplay(long t);
 
   void updateScanTime(long t);
+
+  void getMetrics(MetricsCollector metricsCollector, boolean all);
+
+  /**
+   * Metrics collected at client level for a user(needed for reporting through clusterStatus
+   * and  hbtop currently)
+   * @return metrics per hostname
+   */
+  Map<String, ClientMetrics> getClientMetrics();
+
+  /**
+   * Create a instance of ClientMetrics if not present otherwise return the previous one
+   *
+   * @param hostName hostname of the client
+   * @return Instance of ClientMetrics
+   */
+  ClientMetrics getOrCreateMetricsClient(String hostName);
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
index 835c50b..bc1e8cb 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
@@ -47,4 +47,10 @@ public interface MetricHistogram {
    */
   void add(long value);
 
+  /**
+   * Return the total number of values added to the histogram.
+   * @return the total number of values.
+   */
+  long getCount();
+
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
index c447f40..28726c4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Collections;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
@@ -28,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 @InterfaceAudience.Private
 public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
   implements MetricsUserAggregateSource {
@@ -90,9 +90,9 @@ public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
     }
   }
 
-  @VisibleForTesting
-  public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
-    return userSources;
+  @Override
+  public Map<String, MetricsUserSource> getUserSources() {
+    return Collections.unmodifiableMap(userSources);
   }
 
   @Override
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
index 9f714a3..ef0eb7b 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
@@ -18,9 +18,14 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
 
 import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.MetricsCollector;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +62,48 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
   private final MetricsUserAggregateSourceImpl agg;
   private final DynamicMetricsRegistry registry;
 
+  private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
+
+  static class ClientMetricsImpl implements ClientMetrics {
+    private final String hostName;
+    final LongAdder readRequestsCount = new LongAdder();
+    final LongAdder writeRequestsCount = new LongAdder();
+    final LongAdder filteredRequestsCount = new LongAdder();
+
+    public ClientMetricsImpl(String hostName) {
+      this.hostName = hostName;
+    }
+
+    @Override public void incrementReadRequest() {
+      readRequestsCount.increment();
+    }
+
+    @Override public void incrementWriteRequest() {
+      writeRequestsCount.increment();
+    }
+
+    @Override public String getHostName() {
+      return hostName;
+    }
+
+    @Override public long getReadRequestsCount() {
+      return readRequestsCount.sum();
+    }
+
+    @Override public long getWriteRequestsCount() {
+      return writeRequestsCount.sum();
+    }
+
+    @Override public void incrementFilteredReadRequests() {
+      filteredRequestsCount.increment();
+
+    }
+
+    @Override public long getFilteredReadRequests() {
+      return filteredRequestsCount.sum();
+    }
+  }
+
   public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
@@ -77,7 +124,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
     userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
     userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
     userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
-
+    clientMetricsMap = new ConcurrentHashMap<>();
     agg.register(this);
   }
 
@@ -204,4 +251,27 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
   public void updateScanTime(long t) {
     scanTimeHisto.add(t);
   }
+
+  @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) {
+    MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
+    registry.snapshot(mrb, all);
+  }
+
+  @Override public Map<String, ClientMetrics> getClientMetrics() {
+    return Collections.unmodifiableMap(clientMetricsMap);
+  }
+
+  @Override public ClientMetrics getOrCreateMetricsClient(String client) {
+    ClientMetrics source = clientMetricsMap.get(client);
+    if (source != null) {
+      return source;
+    }
+    source = new ClientMetricsImpl(client);
+    ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
+    if (prev != null) {
+      return prev;
+    }
+    return source;
+  }
+
 }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
index 75dc300..dc86ebe 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
@@ -51,6 +51,10 @@ public class MutableHistogram extends MutableMetric implements MetricHistogram {
     histogram.update(val);
   }
 
+  @Override public long getCount() {
+    return histogram.getCount();
+  }
+
   public long getMax() {
     return histogram.getMax();
   }
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
index 273154f..4a406cc 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
@@ -83,5 +83,9 @@ public abstract class MutableRangeHistogram extends MutableHistogram implements
           Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc),
           val - cumNum);
     }
-  }  
+  }
+
+  @Override public long getCount() {
+    return histogram.getCount();
+  }
 }
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
index aaef965..c7093dd 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
@@ -143,6 +143,10 @@ public final class RecordFilter {
     this.value = Objects.requireNonNull(value);
   }
 
+  public Field getField() {
+    return field;
+  }
+
   public boolean execute(Record record) {
     FieldValue fieldValue = record.get(field);
     if (fieldValue == null) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
index 6e5f66f..df460dd 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
@@ -57,7 +57,11 @@ public enum Field {
     FieldValueType.STRING),
   REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER),
   USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE),
-  MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE);
+  USER("USER", "user Name", true, true, FieldValueType.STRING),
+  MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE),
+  CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER),
+  USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER),
+  CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING);
 
   private final String header;
   private final String description;
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java
new file mode 100644
index 0000000..fe3edd1
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.UserMetrics;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
+import org.apache.hadoop.hbase.hbtop.field.FieldValue;
+import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation for {@link ModeStrategy} for client Mode.
+ */
+@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy {
+
+  private final List<FieldInfo> fieldInfos = Arrays
+      .asList(new FieldInfo(Field.CLIENT, 0, true),
+          new FieldInfo(Field.USER_COUNT, 5, true),
+          new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
+  private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
+
+  ClientModeStrategy() {
+  }
+
+  @Override public List<FieldInfo> getFieldInfos() {
+    return fieldInfos;
+  }
+
+  @Override public Field getDefaultSortField() {
+    return Field.REQUEST_COUNT_PER_SECOND;
+  }
+
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
+    List<Record> records = createRecords(clusterMetrics);
+    return aggregateRecordsAndAddDistinct(
+        ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
+        Field.USER_COUNT);
+  }
+
+  List<Record> createRecords(ClusterMetrics clusterMetrics) {
+    List<Record> ret = new ArrayList<>();
+    for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
+      long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
+      serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach(
+        clientMetrics -> ret.add(
+              createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp,
+                  serverMetrics.getServerName().getServerName()))));
+    }
+    return ret;
+  }
+
+  /**
+   * Aggregate the records and count the unique values for the given distinctField
+   *
+   * @param records               records to be processed
+   * @param groupBy               Field on which group by needs to be done
+   * @param distinctField         Field whose unique values needs to be counted
+   * @param uniqueCountAssignedTo a target field to which the unique count is assigned to
+   * @return aggregated records
+   */
+  List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
+      Field distinctField, Field uniqueCountAssignedTo) {
+    List<Record> result = new ArrayList<>();
+    records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values()
+        .forEach(val -> {
+          Set<FieldValue> distinctValues = new HashSet<>();
+          Map<Field, FieldValue> map = new HashMap<>();
+          for (Record record : val) {
+            for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
+              if (distinctField.equals(field.getKey())) {
+                //We will not be adding the field in the new record whose distinct count is required
+                distinctValues.add(record.get(distinctField));
+              } else {
+                if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
+                  map.put(field.getKey(), field.getValue());
+                } else {
+                  if (map.get(field.getKey()) == null) {
+                    map.put(field.getKey(), field.getValue());
+                  } else {
+                    map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
+                  }
+                }
+              }
+            }
+          }
+          // Add unique count field
+          map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
+          result.add(Record.ofEntries(map.entrySet().stream()
+            .map(k -> Record.entry(k.getKey(), k.getValue()))));
+        });
+    return result;
+  }
+
+  Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
+      long lastReportTimestamp, String server) {
+    Record.Builder builder = Record.builder();
+    String client = clientMetrics.getHostName();
+    builder.put(Field.CLIENT, clientMetrics.getHostName());
+    String mapKey = client + "$" + user + "$" + server;
+    RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
+    if (requestCountPerSecond == null) {
+      requestCountPerSecond = new RequestCountPerSecond();
+      requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
+    }
+    requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
+        clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
+    builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
+    builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
+        requestCountPerSecond.getReadRequestCountPerSecond());
+    builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
+        requestCountPerSecond.getWriteRequestCountPerSecond());
+    builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
+        requestCountPerSecond.getFilteredReadRequestCountPerSecond());
+    builder.put(Field.USER, user);
+    return builder.build();
+  }
+
+  @Override public DrillDownInfo drillDown(Record selectedRecord) {
+    List<RecordFilter> initialFilters = Collections.singletonList(
+        RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
+    return new DrillDownInfo(Mode.USER, initialFilters);
+  }
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
index 1290e69..ffd98df 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Objects;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
 import org.apache.hadoop.hbase.hbtop.field.Field;
 import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -35,7 +36,9 @@ public enum Mode {
   NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()),
   TABLE("Table", "Record per Table", new TableModeStrategy()),
   REGION("Region", "Record per Region", new RegionModeStrategy()),
-  REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy());
+  REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()),
+  USER("User", "Record per user", new UserModeStrategy()),
+  CLIENT("Client", "Record per client", new ClientModeStrategy());
 
   private final String header;
   private final String description;
@@ -55,8 +58,9 @@ public enum Mode {
     return description;
   }
 
-  public List<Record> getRecords(ClusterMetrics clusterMetrics) {
-    return modeStrategy.getRecords(clusterMetrics);
+  public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
+    return modeStrategy.getRecords(clusterMetrics, pushDownFilters);
   }
 
   public List<FieldInfo> getFieldInfos() {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
index 09fa297..021cee2 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
@@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
 import java.util.List;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
 import org.apache.hadoop.hbase.hbtop.field.Field;
 import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -33,6 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience;
 interface ModeStrategy {
   List<FieldInfo> getFieldInfos();
   Field getDefaultSortField();
-  List<Record> getRecords(ClusterMetrics clusterMetrics);
+  List<Record> getRecords(ClusterMetrics clusterMetrics, List<RecordFilter> pushDownFilters);
   @Nullable DrillDownInfo drillDown(Record selectedRecord);
 }
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java
new file mode 100644
index 0000000..9175820
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+
+public final class ModeStrategyUtils {
+  private ModeStrategyUtils() {
+
+  }
+
+  /**
+   * Filter records as per the supplied filters,
+   * @param records records to be processed
+   * @param filters List of filters
+   * @return filtered records
+   */
+  public static List<Record> applyFilterAndGet(List<Record> records,
+      List<RecordFilter> filters) {
+    if (filters != null && !filters.isEmpty()) {
+      return records.stream().filter(r -> filters.stream().allMatch(f -> f.execute(r)))
+          .collect(Collectors.toList());
+    }
+    return records;
+  }
+
+
+  /**
+   * Group by records on the basis of supplied groupBy field and
+   * Aggregate records using {@link Record#combine(Record)}
+   *
+   * @param records records needs to be processed
+   * @param groupBy Field to be used for group by
+   * @return aggregated records
+   */
+  public static List<Record> aggregateRecords(List<Record> records, Field groupBy) {
+    return records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).entrySet().stream()
+        .flatMap(e -> e.getValue().stream().reduce(Record::combine).map(Stream::of)
+            .orElse(Stream.empty())).collect(Collectors.toList());
+  }
+
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
index 866f57e..f74d8bf 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
@@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.hbtop.mode;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.hbtop.Record;
 import org.apache.hadoop.hbase.hbtop.RecordFilter;
@@ -64,27 +63,14 @@ public final class NamespaceModeStrategy implements ModeStrategy {
     return Field.REQUEST_COUNT_PER_SECOND;
   }
 
-  @Override
-  public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
     // Get records from RegionModeStrategy and add REGION_COUNT field
-    List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
-      .map(record ->
-        Record.ofEntries(fieldInfos.stream()
-          .filter(fi -> record.containsKey(fi.getField()))
-          .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
-      .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
-      .collect(Collectors.toList());
+    List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+        regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
 
     // Aggregation by NAMESPACE field
-    return records.stream()
-      .collect(Collectors.groupingBy(r -> r.get(Field.NAMESPACE).asString()))
-      .entrySet().stream()
-      .flatMap(
-        e -> e.getValue().stream()
-          .reduce(Record::combine)
-          .map(Stream::of)
-          .orElse(Stream.empty()))
-      .collect(Collectors.toList());
+    return ModeStrategyUtils.aggregateRecords(records, Field.NAMESPACE);
   }
 
   @Override
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
index e5deda0..0adbc82 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
+
 import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.RegionMetrics;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
 import org.apache.hadoop.hbase.hbtop.field.Field;
 import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -83,8 +86,8 @@ public final class RegionModeStrategy implements ModeStrategy {
     return Field.REQUEST_COUNT_PER_SECOND;
   }
 
-  @Override
-  public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
     List<Record> ret = new ArrayList<>();
     for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
       long lastReportTimestamp = sm.getLastReportTimestamp();
@@ -174,6 +177,27 @@ public final class RegionModeStrategy implements ModeStrategy {
     return builder.build();
   }
 
+  /**
+   * Form new record list with records formed by only fields provided through fieldInfo and
+   * add a count field for each record with value 1
+   * We are doing two operation of selecting and adding new field
+   * because of saving some CPU cycles on rebuilding the record again
+   *
+   * @param fieldInfos List of FieldInfos required in the record
+   * @param records    List of records which needs to be processed
+   * @param countField Field which needs to be added with value 1 for each record
+   * @return records after selecting required fields and adding count field
+   */
+  List<Record> selectModeFieldsAndAddCountField(List<FieldInfo> fieldInfos, List<Record> records,
+      Field countField) {
+
+    return records.stream().map(record -> Record.ofEntries(
+        fieldInfos.stream().filter(fi -> record.containsKey(fi.getField()))
+            .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
+        .map(record -> Record.builder().putAll(record).put(countField, 1).build())
+        .collect(Collectors.toList());
+  }
+
   @Nullable
   @Override
   public DrillDownInfo drillDown(Record selectedRecord) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
index d64f713..44a9a2c 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
@@ -23,7 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ServerMetrics;
 import org.apache.hadoop.hbase.hbtop.Record;
@@ -70,27 +70,15 @@ public final class RegionServerModeStrategy implements ModeStrategy {
     return Field.REQUEST_COUNT_PER_SECOND;
   }
 
-  @Override
-  public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
     // Get records from RegionModeStrategy and add REGION_COUNT field
-    List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
-      .map(record ->
-        Record.ofEntries(fieldInfos.stream()
-          .filter(fi -> record.containsKey(fi.getField()))
-          .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
-      .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
-      .collect(Collectors.toList());
-
+    List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+        regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
     // Aggregation by LONG_REGION_SERVER field
-    Map<String, Record> retMap = records.stream()
-      .collect(Collectors.groupingBy(r -> r.get(Field.LONG_REGION_SERVER).asString()))
-      .entrySet().stream()
-      .flatMap(
-        e -> e.getValue().stream()
-          .reduce(Record::combine)
-          .map(Stream::of)
-          .orElse(Stream.empty()))
-      .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
+    Map<String, Record> retMap =
+        ModeStrategyUtils.aggregateRecords(records, Field.LONG_REGION_SERVER).stream()
+            .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
 
     // Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field
     for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
index 1da074f..4acc344 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
@@ -65,16 +65,11 @@ public final class TableModeStrategy implements ModeStrategy {
     return Field.REQUEST_COUNT_PER_SECOND;
   }
 
-  @Override
-  public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
     // Get records from RegionModeStrategy and add REGION_COUNT field
-    List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
-      .map(record ->
-        Record.ofEntries(fieldInfos.stream()
-          .filter(fi -> record.containsKey(fi.getField()))
-          .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
-      .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
-      .collect(Collectors.toList());
+    List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+        regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
 
     // Aggregation by NAMESPACE field and TABLE field
     return records.stream()
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java
new file mode 100644
index 0000000..605376e
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation for {@link ModeStrategy} for User Mode.
+ */
+@InterfaceAudience.Private public final class UserModeStrategy implements ModeStrategy {
+
+  private final List<FieldInfo> fieldInfos = Arrays
+      .asList(new FieldInfo(Field.USER, 0, true),
+          new FieldInfo(Field.CLIENT_COUNT, 7, true),
+          new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
+          new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
+  private final ClientModeStrategy clientModeStrategy = new ClientModeStrategy();
+
+  UserModeStrategy() {
+  }
+
+  @Override public List<FieldInfo> getFieldInfos() {
+    return fieldInfos;
+  }
+
+  @Override public Field getDefaultSortField() {
+    return Field.REQUEST_COUNT_PER_SECOND;
+  }
+
+  @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+      List<RecordFilter> pushDownFilters) {
+    List<Record> records = clientModeStrategy.createRecords(clusterMetrics);
+    return clientModeStrategy.aggregateRecordsAndAddDistinct(
+        ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.USER, Field.CLIENT,
+        Field.CLIENT_COUNT);
+  }
+
+  @Override public DrillDownInfo drillDown(Record selectedRecord) {
+    //Drill down to client and using selected USER as a filter
+    List<RecordFilter> initialFilters = Collections.singletonList(
+        RecordFilter.newBuilder(Field.USER).doubleEquals(selectedRecord.get(Field.USER)));
+    return new DrillDownInfo(Mode.CLIENT, initialFilters);
+  }
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
index 42e81e1..3dd7f12 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
@@ -17,12 +17,15 @@
  */
 package org.apache.hadoop.hbase.hbtop.screen.top;
 
+import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT;
+
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
-import org.apache.commons.lang3.time.DateFormatUtils;
+
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.hbtop.Record;
@@ -56,6 +59,7 @@ public class TopScreenModel {
   private List<Record> records;
 
   private final List<RecordFilter> filters = new ArrayList<>();
+  private final List<RecordFilter> pushDownFilters = new ArrayList<>();
   private final List<String> filterHistories = new ArrayList<>();
 
   private boolean ascendingSort;
@@ -88,6 +92,7 @@ public class TopScreenModel {
     if (initialFilters != null) {
       filters.addAll(initialFilters);
     }
+    decomposePushDownFilter();
   }
 
   public void setSortFieldAndFields(Field sortField, List<Field> fields) {
@@ -113,7 +118,7 @@ public class TopScreenModel {
   }
 
   private void refreshSummary(ClusterMetrics clusterMetrics) {
-    String currentTime = DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT
+    String currentTime = ISO_8601_EXTENDED_TIME_FORMAT
       .format(System.currentTimeMillis());
     String version = clusterMetrics.getHBaseVersion();
     String clusterId = clusterMetrics.getClusterId();
@@ -130,7 +135,7 @@ public class TopScreenModel {
   }
 
   private void refreshRecords(ClusterMetrics clusterMetrics) {
-    List<Record> records = currentMode.getRecords(clusterMetrics);
+    List<Record> records = currentMode.getRecords(clusterMetrics, pushDownFilters);
 
     // Filter and sort
     records = records.stream()
@@ -153,13 +158,13 @@ public class TopScreenModel {
     if (filter == null) {
       return false;
     }
-
     filters.add(filter);
     filterHistories.add(filterString);
     return true;
   }
 
   public void clearFilters() {
+    pushDownFilters.clear();
     filters.clear();
   }
 
@@ -203,4 +208,18 @@ public class TopScreenModel {
   public List<String> getFilterHistories() {
     return Collections.unmodifiableList(filterHistories);
   }
+
+  private void decomposePushDownFilter() {
+    pushDownFilters.clear();
+    for (RecordFilter filter : filters) {
+      if (!fields.contains(filter.getField())) {
+        pushDownFilters.add(filter);
+      }
+    }
+    filters.removeAll(pushDownFilters);
+  }
+
+  public Collection<? extends RecordFilter> getPushDownFilters() {
+    return pushDownFilters;
+  }
 }
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
index d435f5c..e4cd386 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
 import org.apache.hadoop.hbase.hbtop.field.Field;
 import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
 import org.apache.hadoop.hbase.hbtop.mode.Mode;
@@ -324,7 +325,8 @@ public class TopScreenPresenter {
   }
 
   public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) {
-    return new FilterDisplayModeScreenView(screen, terminal, row, topScreenModel.getFilters(),
-      topScreenView);
+    ArrayList<RecordFilter> filters = new ArrayList<>(topScreenModel.getFilters());
+    filters.addAll(topScreenModel.getPushDownFilters());
+    return new FilterDisplayModeScreenView(screen, terminal, row, filters, topScreenView);
   }
 }
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
index 43a8447..bad2a00 100644
--- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
@@ -27,6 +27,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.commons.lang3.time.FastDateFormat;
 import org.apache.hadoop.hbase.ClusterMetrics;
 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@@ -37,6 +38,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UserMetrics;
+import org.apache.hadoop.hbase.UserMetricsBuilder;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.hbtop.field.Field;
 import org.apache.hadoop.hbase.hbtop.screen.top.Summary;
@@ -54,6 +57,9 @@ public final class TestUtils {
 
     // host1
     List<RegionMetrics> regionMetricsList = new ArrayList<>();
+    List<UserMetrics> userMetricsList = new ArrayList<>();
+    userMetricsList.add(createUserMetrics("FOO",1,2, 4));
+    userMetricsList.add(createUserMetrics("BAR",2,3, 3));
     regionMetricsList.add(createRegionMetrics(
       "table1,,1.00000000000000000000000000000000.",
       100, 50, 100,
@@ -73,10 +79,13 @@ public final class TestUtils {
     ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1);
     serverMetricsMap.put(host1, createServerMetrics(host1, 100,
       new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100,
-      regionMetricsList));
+      regionMetricsList, userMetricsList));
 
     // host2
     regionMetricsList.clear();
+    userMetricsList.clear();
+    userMetricsList.add(createUserMetrics("FOO",5,7, 3));
+    userMetricsList.add(createUserMetrics("BAR",4,8, 4));
     regionMetricsList.add(createRegionMetrics(
       "table1,1,4.00000000000000000000000000000003.",
       100, 50, 100,
@@ -96,7 +105,7 @@ public final class TestUtils {
     ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2);
     serverMetricsMap.put(host2, createServerMetrics(host2, 200,
       new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200,
-      regionMetricsList));
+      regionMetricsList, userMetricsList));
 
     ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3);
     return ClusterMetricsBuilder.newBuilder()
@@ -117,6 +126,15 @@ public final class TestUtils {
       .build();
   }
 
+  private static UserMetrics createUserMetrics(String user, long readRequestCount,
+      long writeRequestCount, long filteredReadRequestsCount) {
+    return UserMetricsBuilder.newBuilder(Bytes.toBytes(user)).addClientMetris(
+        new UserMetricsBuilder.ClientMetricsImpl("CLIENT_A_" + user, readRequestCount,
+            writeRequestCount, filteredReadRequestsCount)).addClientMetris(
+        new UserMetricsBuilder.ClientMetricsImpl("CLIENT_B_" + user, readRequestCount,
+            writeRequestCount, filteredReadRequestsCount)).build();
+  }
+
   private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount,
     long filteredReadRequestCount, long writeRequestCount, Size storeFileSize,
     Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality,
@@ -139,14 +157,15 @@ public final class TestUtils {
 
   private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp,
     Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond,
-    List<RegionMetrics> regionMetricsList) {
+    List<RegionMetrics> regionMetricsList, List<UserMetrics> userMetricsList) {
 
     return ServerMetricsBuilder.newBuilder(serverName)
       .setReportTimestamp(reportTimestamp)
       .setUsedHeapSize(usedHeapSize)
       .setMaxHeapSize(maxHeapSize)
       .setRequestCountPerSecond(requestCountPerSecond)
-      .setRegionMetrics(regionMetricsList).build();
+      .setRegionMetrics(regionMetricsList)
+      .setUserMetrics(userMetricsList).build();
   }
 
   public static void assertRecordsInRegionMode(List<Record> records) {
@@ -316,10 +335,78 @@ public final class TestUtils {
     }
   }
 
+  public static void assertRecordsInUserMode(List<Record> records) {
+    assertThat(records.size(), is(2));
+    for (Record record : records) {
+      String user = record.get(Field.USER).asString();
+      switch (user) {
+        //readRequestPerSecond and writeRequestPerSecond will be zero
+        // because there is no change or new metrics during refresh
+        case "FOO":
+          assertRecordInUserMode(record, 0L, 0L, 0L);
+          break;
+        case "BAR":
+          assertRecordInUserMode(record, 0L, 0L, 0L);
+          break;
+        default:
+          fail();
+      }
+    }
+  }
+
+  public static void assertRecordsInClientMode(List<Record> records) {
+    assertThat(records.size(), is(4));
+    for (Record record : records) {
+      String client = record.get(Field.CLIENT).asString();
+      switch (client) {
+      //readRequestPerSecond and writeRequestPerSecond will be zero
+      // because there is no change or new metrics during refresh
+        case "CLIENT_A_FOO":
+          assertRecordInClientMode(record, 0L, 0L, 0L);
+          break;
+        case "CLIENT_A_BAR":
+          assertRecordInClientMode(record, 0L, 0L, 0L);
+          break;
+        case "CLIENT_B_FOO":
+          assertRecordInClientMode(record, 0L, 0L, 0L);
+          break;
+        case "CLIENT_B_BAR":
+          assertRecordInClientMode(record, 0L, 0L, 0L);
+          break;
+        default:
+          fail();
+      }
+    }
+  }
+
+  private static void assertRecordInUserMode(Record record, long readRequestCountPerSecond,
+      long writeCountRequestPerSecond, long filteredReadRequestsCount) {
+    assertThat(record.size(), is(6));
+    assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(readRequestCountPerSecond));
+    assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(writeCountRequestPerSecond));
+    assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(filteredReadRequestsCount));
+    assertThat(record.get(Field.CLIENT_COUNT).asInt(), is(2));
+  }
+
+  private static void assertRecordInClientMode(Record record, long readRequestCountPerSecond,
+      long writeCountRequestPerSecond, long filteredReadRequestsCount) {
+    assertThat(record.size(), is(6));
+    assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(readRequestCountPerSecond));
+    assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(writeCountRequestPerSecond));
+    assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
+        is(filteredReadRequestsCount));
+    assertThat(record.get(Field.USER_COUNT).asInt(), is(1));
+  }
+
   private static void assertRecordInTableMode(Record record, long requestCountPerSecond,
-    long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
-    long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
-    int numStoreFiles, Size memStoreSize, int regionCount) {
+      long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
+      long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
+      int numStoreFiles, Size memStoreSize, int regionCount) {
     assertThat(record.size(), is(11));
     assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(),
       is(requestCountPerSecond));
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java
new file mode 100644
index 0000000..82dbe45
--- /dev/null
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.TestUtils;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class ClientModeTest extends ModeTestBase {
+
+  @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(ClientModeTest.class);
+
+  @Override protected Mode getMode() {
+    return Mode.CLIENT;
+  }
+
+  @Override protected void assertRecords(List<Record> records) {
+    TestUtils.assertRecordsInClientMode(records);
+  }
+
+  @Override protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
+    assertThat(drillDownInfo.getNextMode(), is(Mode.USER));
+    assertThat(drillDownInfo.getInitialFilters().size(), is(1));
+    String client = currentRecord.get(Field.CLIENT).asString();
+    switch (client) {
+      case "CLIENT_A_FOO":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_FOO"));
+        break;
+
+      case "CLIENT_B_FOO":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_FOO"));
+        break;
+
+      case "CLIENT_A_BAR":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_BAR"));
+        break;
+      case "CLIENT_B_BAR":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_BAR"));
+        break;
+
+      default:
+        fail();
+    }
+  }
+}
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
index 7ad1a3a..2b6db84 100644
--- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
@@ -27,7 +27,8 @@ public abstract class ModeTestBase {
 
   @Test
   public void testGetRecords() {
-    List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
+    List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
+        null);
     assertRecords(records);
   }
 
@@ -36,7 +37,8 @@ public abstract class ModeTestBase {
 
   @Test
   public void testDrillDown() {
-    List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
+    List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
+        null);
     for (Record record : records) {
       assertDrillDown(record, getMode().drillDown(record));
     }
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java
new file mode 100644
index 0000000..32e111c
--- /dev/null
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.TestUtils;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class UserModeTest extends ModeTestBase {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(UserModeTest.class);
+
+  @Override
+  protected Mode getMode() {
+    return Mode.USER;
+  }
+
+  @Override
+  protected void assertRecords(List<Record> records) {
+    TestUtils.assertRecordsInUserMode(records);
+  }
+
+  @Override
+  protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
+    assertThat(drillDownInfo.getNextMode(), is(Mode.CLIENT));
+    assertThat(drillDownInfo.getInitialFilters().size(), is(1));
+    String userName = currentRecord.get(Field.USER).asString();
+
+    switch (userName) {
+      case "FOO":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==FOO"));
+        break;
+
+      case "BAR":
+        assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==BAR"));
+        break;
+
+      default:
+        fail();
+    }
+  }
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 78d2e83..77b9887 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -157,6 +157,29 @@ message RegionLoad {
   optional int32 max_store_file_ref_count = 22 [default = 0];
 }
 
+message UserLoad {
+
+  /** short user name */
+  required string userName = 1;
+
+  /** Metrics for all clients of a user */
+  repeated ClientMetrics clientMetrics = 2;
+}
+
+message ClientMetrics {
+  /** client host name */
+  required string hostName = 1;
+
+  /** the current total read requests made from a client */
+  optional uint64 read_requests_count = 2;
+
+  /** the current total write requests made from a client */
+  optional uint64 write_requests_count = 3;
+
+  /** the current total filtered requests made from a client */
+  optional uint64 filtered_requests_count = 4;
+}
+
 /* Server-level protobufs */
 
 message ReplicationLoadSink {
@@ -230,6 +253,11 @@ message ServerLoad {
    * The replicationLoadSink for the replication Sink status of this region server.
    */
   optional ReplicationLoadSink replLoadSink = 11;
+
+  /**
+   * The metrics for each user on this region server
+   */
+  repeated UserLoad userLoads = 12;
 }
 
 message LiveServerInfo {
diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
index 3494908..0234738 100644
--- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
@@ -153,6 +153,32 @@ message RegionLoad {
   optional int32 max_store_file_ref_count = 22 [default = 0];
 }
 
+message UserLoad {
+
+  /** short user name */
+  required string userName = 1;
+
+  /** Metrics for all clients of a user */
+  repeated ClientMetrics clientMetrics = 2;
+
+
+}
+
+message ClientMetrics {
+  /** client host name */
+  required string hostName = 1;
+
+  /** the current total read requests made from a client */
+  optional uint64 read_requests_count = 2;
+
+  /** the current total write requests made from a client */
+  optional uint64 write_requests_count = 3;
+
+  /** the current total filtered requests made from a client */
+  optional uint64 filtered_requests_count = 4;
+
+}
+
 /* Server-level protobufs */
 
 message ReplicationLoadSink {
@@ -219,6 +245,11 @@ message ServerLoad {
    * The replicationLoadSink for the replication Sink status of this region server.
    */
   optional ReplicationLoadSink replLoadSink = 11;
+
+  /**
+   * The metrics for each user on this region server
+   */
+  repeated UserLoad userLoads = 12;
 }
 
 message LiveServerInfo {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a431e5d..d1490c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -843,7 +843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       // TODO: revisit if coprocessors should load in other cases
       this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
       this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
-      this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
+      this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf);
     } else {
       this.metricsRegionWrapper = null;
       this.metricsRegion = null;
@@ -6582,6 +6582,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
       if (!outResults.isEmpty()) {
         readRequestsCount.increment();
+        if (metricsRegion != null) {
+          metricsRegion.updateReadRequestCount();
+        }
       }
       if (rsServices != null && rsServices.getMetrics() != null) {
         rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable());
@@ -6909,6 +6912,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
     protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
       filteredReadRequestsCount.increment();
+      if (metricsRegion != null) {
+        metricsRegion.updateFilteredRecords();
+      }
 
       if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3104982..1c1f0b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import javax.management.MalformedObjectNameException;
 import javax.servlet.http.HttpServlet;
+
 import org.apache.commons.lang3.RandomUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.SystemUtils;
@@ -208,6 +209,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Coprocesso
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@@ -1379,7 +1381,14 @@ public class HRegionServer extends HasThread implements
     } else {
       serverLoad.setInfoServerPort(-1);
     }
-
+    MetricsUserAggregateSource userSource =
+        metricsRegionServer.getMetricsUserAggregate().getSource();
+    if (userSource != null) {
+      Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
+      for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
+        serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
+      }
+    }
     // for the replicationLoad purpose. Only need to get from one executorService
     // either source or sink will get the same info
     ReplicationSourceService rsources = getReplicationSourceService();
@@ -1717,6 +1726,19 @@ public class HRegionServer extends HasThread implements
     return regionLoadBldr.build();
   }
 
+  private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
+    UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
+    userLoadBldr.setUserName(user);
+    userSource.getClientMetrics().values().stream().map(
+      clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
+            .setHostName(clientMetrics.getHostName())
+            .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
+            .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
+            .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
+        .forEach(userLoadBldr::addClientMetrics);
+    return userLoadBldr.build();
+  }
+
   public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
     HRegion r = onlineRegions.get(encodedRegionName);
     return r != null ? createRegionLoad(r, null, null) : null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
index 868d9b9..a1dad02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
@@ -18,6 +18,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 
@@ -29,12 +30,14 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
 @InterfaceAudience.Private
 public class MetricsRegion {
   private final MetricsRegionSource source;
+  private final MetricsUserAggregate userAggregate;
   private MetricsRegionWrapper regionWrapper;
 
-  public MetricsRegion(final MetricsRegionWrapper wrapper) {
+  public MetricsRegion(final MetricsRegionWrapper wrapper, Configuration conf) {
     source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
                                              .createRegion(wrapper);
     this.regionWrapper = wrapper;
+    userAggregate = MetricsUserAggregateFactory.getMetricsUserAggregate(conf);
   }
 
   public void close() {
@@ -57,6 +60,9 @@ public class MetricsRegion {
     source.updateScanTime(t);
   }
 
+  public void updateFilteredRecords(){
+    userAggregate.updateFilteredReadRequests();
+  }
   public void updateAppend() {
     source.updateAppend();
   }
@@ -73,4 +79,7 @@ public class MetricsRegion {
     return regionWrapper;
   }
 
+  public void updateReadRequestCount() {
+    userAggregate.updateReadRequestCount();
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index 808fc58..98f400e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -101,7 +101,7 @@ public class MetricsRegionServer {
   }
 
   @VisibleForTesting
-  public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
+  public MetricsUserAggregate getMetricsUserAggregate() {
     return userAggregate;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
index 9b23ccc..41d6d6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
@@ -23,6 +23,11 @@ import org.apache.yetus.audience.InterfaceAudience;
 @InterfaceAudience.Private
 public interface MetricsUserAggregate {
 
+  /**
+   * @return return a singleton instance of MetricsUserAggregateSource or null in case of NoOp
+   */
+  MetricsUserAggregateSource getSource();
+
   void updatePut(long t);
 
   void updateDelete(long t);
@@ -36,4 +41,8 @@ public interface MetricsUserAggregate {
   void updateReplay(long t);
 
   void updateScanTime(long t);
+
+  void updateFilteredReadRequests();
+
+  void updateReadRequestCount();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
index 888c480..38e440b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.yetus.audience.InterfaceAudience;
 
@@ -27,6 +26,7 @@ public class MetricsUserAggregateFactory {
   private MetricsUserAggregateFactory() {
 
   }
+
   public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
   public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
 
@@ -36,6 +36,10 @@ public class MetricsUserAggregateFactory {
     } else {
       //NoOpMetricUserAggregate
       return new MetricsUserAggregate() {
+        @Override public MetricsUserAggregateSource getSource() {
+          return null;
+        }
+
         @Override public void updatePut(long t) {
 
         }
@@ -63,6 +67,14 @@ public class MetricsUserAggregateFactory {
         @Override public void updateScanTime(long t) {
 
         }
+
+        @Override public void updateFilteredReadRequests() {
+
+        }
+
+        @Override public void updateReadRequestCount() {
+
+        }
       };
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
index 6c24afc..b457c75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.net.InetAddress;
 import java.util.Optional;
 
 import org.apache.hadoop.conf.Configuration;
@@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.util.LossyCounting;
 import org.apache.yetus.audience.InterfaceAudience;
 
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
 @InterfaceAudience.Private
 public class MetricsUserAggregateImpl implements MetricsUserAggregate{
 
@@ -65,8 +64,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
     return user.isPresent() ? user.get().getShortName() : null;
   }
 
-  @VisibleForTesting
-  MetricsUserAggregateSource getSource() {
+  @Override
+  public MetricsUserAggregateSource getSource() {
     return source;
   }
 
@@ -74,7 +73,39 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updatePut(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updatePut(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updatePut(t);
+      incrementClientWriteMetrics(userSource);
+    }
+
+  }
+
+  private String getClient() {
+    Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
+    if (ipOptional.isPresent()) {
+      return ipOptional.get().getHostName();
+    }
+    return null;
+  }
+
+  private void incrementClientReadMetrics(MetricsUserSource userSource) {
+    String client = getClient();
+    if (client != null && userSource != null) {
+      userSource.getOrCreateMetricsClient(client).incrementReadRequest();
+    }
+  }
+
+  private void incrementFilteredReadRequests(MetricsUserSource userSource) {
+    String client = getClient();
+    if (client != null && userSource != null) {
+      userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests();
+    }
+  }
+
+  private void incrementClientWriteMetrics(MetricsUserSource userSource) {
+    String client = getClient();
+    if (client != null && userSource != null) {
+      userSource.getOrCreateMetricsClient(client).incrementWriteRequest();
     }
   }
 
@@ -82,7 +113,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateDelete(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateDelete(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateDelete(t);
+      incrementClientWriteMetrics(userSource);
     }
   }
 
@@ -90,7 +123,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateGet(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateGet(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateGet(t);
     }
   }
 
@@ -98,7 +132,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateIncrement(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateIncrement(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateIncrement(t);
+      incrementClientWriteMetrics(userSource);
     }
   }
 
@@ -106,7 +142,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateAppend(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateAppend(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateAppend(t);
+      incrementClientWriteMetrics(userSource);
     }
   }
 
@@ -114,7 +152,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateReplay(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateReplay(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateReplay(t);
+      incrementClientWriteMetrics(userSource);
     }
   }
 
@@ -122,7 +162,24 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
   public void updateScanTime(long t) {
     String user = getActiveUser();
     if (user != null) {
-      getOrCreateMetricsUser(user).updateScanTime(t);
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      userSource.updateScanTime(t);
+    }
+  }
+
+  @Override public void updateFilteredReadRequests() {
+    String user = getActiveUser();
+    if (user != null) {
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      incrementFilteredReadRequests(userSource);
+    }
+  }
+
+  @Override public void updateReadRequestCount() {
+    String user = getActiveUser();
+    if (user != null) {
+      MetricsUserSource userSource = getOrCreateMetricsUser(user);
+      incrementClientReadMetrics(userSource);
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
index e692351..0f4a35b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
@@ -18,28 +18,39 @@
 package org.apache.hadoop.hbase;
 
 import java.io.IOException;
+import java.security.PrivilegedAction;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
 import org.apache.hadoop.hbase.Waiter.Predicate;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.AsyncAdmin;
 import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
 import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
 import org.apache.hadoop.hbase.coprocessor.MasterObserver;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.filter.FilterAllFilter;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
@@ -238,6 +249,137 @@ public class TestClientClusterMetrics {
     Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
   }
 
+  @Test public void testUserMetrics() throws Exception {
+    Configuration conf = UTIL.getConfiguration();
+    User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
+    User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
+    User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
+    UTIL.createTable(TABLE_NAME, CF);
+    waitForUsersMetrics(0);
+    long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
+    userFoo.runAs(new PrivilegedAction<Void>() {
+      @Override public Void run() {
+        try {
+          doPut();
+        } catch (IOException e) {
+          Assert.fail("Exception:" + e.getMessage());
+        }
+        return null;
+      }
+    });
+    waitForUsersMetrics(1);
+    long writeMetaMetricForUserFoo =
+        getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
+    long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
+    userBar.runAs(new PrivilegedAction<Void>() {
+      @Override public Void run() {
+        try {
+          doGet();
+        } catch (IOException e) {
+          Assert.fail("Exception:" + e.getMessage());
+        }
+        return null;
+      }
+    });
+    waitForUsersMetrics(2);
+    long readMetaMetricForUserBar =
+        getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
+    long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
+    userTest.runAs(new PrivilegedAction<Void>() {
+      @Override public Void run() {
+        try {
+          Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
+          for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
+            Assert.fail("Should have filtered all rows");
+          }
+        } catch (IOException e) {
+          Assert.fail("Exception:" + e.getMessage());
+        }
+        return null;
+      }
+    });
+    waitForUsersMetrics(3);
+    long filteredMetaReqeustForTestUser =
+        getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
+    Map<byte[], UserMetrics> userMap =
+        ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
+            .iterator().next().getUserMetrics();
+    for (byte[] user : userMap.keySet()) {
+      switch (Bytes.toString(user)) {
+        case "FOO_USER_METRIC_TEST":
+          Assert.assertEquals(1,
+              userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
+          break;
+        case "BAR_USER_METRIC_TEST":
+          Assert
+              .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
+          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
+          break;
+        case "TEST_USER_METRIC_TEST":
+          Assert.assertEquals(1,
+              userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
+          Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
+          break;
+        default:
+          //current user
+          Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
+              Bytes.toString(user));
+          //Read/write count because of Meta operations
+          Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
+          break;
+      }
+    }
+    UTIL.deleteTable(TABLE_NAME);
+  }
+
+  private RegionMetrics getMetaMetrics() throws IOException {
+    for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+        .getLiveServerMetrics().values()) {
+      RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
+          .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
+      if (metaMetrics != null) {
+        return metaMetrics;
+      }
+    }
+    Assert.fail("Should have find meta metrics");
+    return null;
+  }
+
+  private void waitForUsersMetrics(int noOfUsers) throws Exception {
+    //Sleep for metrics to get updated on master
+    Thread.sleep(5000);
+    Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
+      @Override public boolean evaluate() throws Exception {
+        Map<byte[], UserMetrics> metrics =
+            ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
+                .iterator().next().getUserMetrics();
+        Assert.assertNotNull(metrics);
+        //including current user + noOfUsers
+        return metrics.keySet().size() > noOfUsers;
+      }
+    });
+  }
+
+  private void doPut() throws IOException {
+    try (Connection conn = createConnection(UTIL.getConfiguration())) {
+      Table table = conn.getTable(TABLE_NAME);
+      table
+        .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
+    }
+  }
+
+  private void doGet() throws IOException {
+    try (Connection conn = createConnection(UTIL.getConfiguration())) {
+      Table table = conn.getTable(TABLE_NAME);
+      table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
+    }
+  }
+
+  private Connection createConnection(Configuration conf) throws IOException {
+    User user = UserProvider.instantiate(conf).getCurrent();
+    return ConnectionFactory.createConnection(conf, user);
+  }
+
   @Test
   public void testOtherStatusInfos() throws Exception {
     EnumSet<Option> options =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
index 2e40bbc..58cb549 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Size;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UserMetrics;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
 import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -356,6 +357,10 @@ public class TestRegionsRecoveryChore {
         return regionMetricsMap;
       }
 
+      @Override public Map<byte[], UserMetrics> getUserMetrics() {
+        return new HashMap<>();
+      }
+
       @Override
       public Set<String> getCoprocessorNames() {
         return null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
index 2119b91..b5fbc96 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompatibilityFactory;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
@@ -38,7 +39,7 @@ public class TestMetricsRegion {
 
   @Test
   public void testRegionWrapperMetrics() {
-    MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub());
+    MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub(), new Configuration());
     MetricsRegionAggregateSource agg = mr.getSource().getAggregateSource();
 
     HELPER.assertGauge(
@@ -72,7 +73,7 @@ public class TestMetricsRegion {
     mr.close();
 
     // test region with replica id > 0
-    mr = new MetricsRegion(new MetricsRegionWrapperStub(1));
+    mr = new MetricsRegion(new MetricsRegionWrapperStub(1), new Configuration());
     agg = mr.getSource().getAggregateSource();
     HELPER.assertGauge(
       "namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_storeCount",