You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2019/12/23 04:31:42 UTC
[hbase] branch branch-2 updated: HBASE-23065 [hbtop] Top-N heavy
hitter user and client drill downs
This is an automated email from the ASF dual-hosted git repository.
ankit pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 6e6c7b3 HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs
6e6c7b3 is described below
commit 6e6c7b3c2d5a88a41ba8f2e369780d2278e361dd
Author: Ankit Singhal <an...@apache.org>
AuthorDate: Sun Nov 17 15:07:52 2019 -0800
HBASE-23065 [hbtop] Top-N heavy hitter user and client drill downs
Signed-off-by: Toshihiro Suzuki <br...@gmail.com>
Signed-off-by: Josh Elser <el...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../java/org/apache/hadoop/hbase/ServerLoad.java | 4 +
.../org/apache/hadoop/hbase/ServerMetrics.java | 5 +
.../apache/hadoop/hbase/ServerMetricsBuilder.java | 38 +++--
.../java/org/apache/hadoop/hbase/UserMetrics.java | 86 +++++++++++
.../apache/hadoop/hbase/UserMetricsBuilder.java | 151 ++++++++++++++++++++
.../regionserver/MetricsUserAggregateSource.java | 4 +
.../hbase/regionserver/MetricsUserSource.java | 37 +++++
.../apache/hadoop/metrics2/MetricHistogram.java | 6 +
.../MetricsUserAggregateSourceImpl.java | 10 +-
.../hbase/regionserver/MetricsUserSourceImpl.java | 72 +++++++++-
.../hadoop/metrics2/lib/MutableHistogram.java | 4 +
.../hadoop/metrics2/lib/MutableRangeHistogram.java | 6 +-
.../apache/hadoop/hbase/hbtop/RecordFilter.java | 4 +
.../org/apache/hadoop/hbase/hbtop/field/Field.java | 6 +-
.../hbase/hbtop/mode/ClientModeStrategy.java | 157 +++++++++++++++++++++
.../org/apache/hadoop/hbase/hbtop/mode/Mode.java | 10 +-
.../hadoop/hbase/hbtop/mode/ModeStrategy.java | 3 +-
.../hadoop/hbase/hbtop/mode/ModeStrategyUtils.java | 63 +++++++++
.../hbase/hbtop/mode/NamespaceModeStrategy.java | 26 +---
.../hbase/hbtop/mode/RegionModeStrategy.java | 28 +++-
.../hbase/hbtop/mode/RegionServerModeStrategy.java | 28 ++--
.../hadoop/hbase/hbtop/mode/TableModeStrategy.java | 13 +-
.../hadoop/hbase/hbtop/mode/UserModeStrategy.java | 70 +++++++++
.../hbase/hbtop/screen/top/TopScreenModel.java | 27 +++-
.../hbase/hbtop/screen/top/TopScreenPresenter.java | 6 +-
.../org/apache/hadoop/hbase/hbtop/TestUtils.java | 101 ++++++++++++-
.../hadoop/hbase/hbtop/mode/ClientModeTest.java | 72 ++++++++++
.../hadoop/hbase/hbtop/mode/ModeTestBase.java | 6 +-
.../hadoop/hbase/hbtop/mode/UserModeTest.java | 70 +++++++++
.../src/main/protobuf/ClusterStatus.proto | 28 ++++
.../src/main/protobuf/ClusterStatus.proto | 31 ++++
.../apache/hadoop/hbase/regionserver/HRegion.java | 8 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 24 +++-
.../hadoop/hbase/regionserver/MetricsRegion.java | 11 +-
.../hbase/regionserver/MetricsRegionServer.java | 2 +-
.../hbase/regionserver/MetricsUserAggregate.java | 9 ++
.../regionserver/MetricsUserAggregateFactory.java | 14 +-
.../regionserver/MetricsUserAggregateImpl.java | 79 +++++++++--
.../hadoop/hbase/TestClientClusterMetrics.java | 142 +++++++++++++++++++
.../hbase/master/TestRegionsRecoveryChore.java | 5 +
.../hbase/regionserver/TestMetricsRegion.java | 5 +-
41 files changed, 1364 insertions(+), 107 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
index 15c8e63..b22d6c4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerLoad.java
@@ -412,6 +412,10 @@ public class ServerLoad implements ServerMetrics {
return metrics.getRegionMetrics();
}
+ @Override public Map<byte[], UserMetrics> getUserMetrics() {
+ return metrics.getUserMetrics();
+ }
+
@Override
public Set<String> getCoprocessorNames() {
return metrics.getCoprocessorNames();
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
index 391e62f..21fad92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetrics.java
@@ -94,6 +94,11 @@ public interface ServerMetrics {
Map<byte[], RegionMetrics> getRegionMetrics();
/**
+ * @return metrics per user
+ */
+ Map<byte[], UserMetrics> getUserMetrics();
+
+ /**
* Return the RegionServer-level and Region-level coprocessors
* @return string set of loaded RegionServer-level and Region-level coprocessors
*/
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
index a9fa71f..e5cd7b2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ServerMetricsBuilder.java
@@ -77,6 +77,8 @@ public final class ServerMetricsBuilder {
.map(HBaseProtos.Coprocessor::getName).collect(Collectors.toList()))
.setRegionMetrics(serverLoadPB.getRegionLoadsList().stream()
.map(RegionMetricsBuilder::toRegionMetrics).collect(Collectors.toList()))
+ .setUserMetrics(serverLoadPB.getUserLoadsList().stream()
+ .map(UserMetricsBuilder::toUserMetrics).collect(Collectors.toList()))
.setReplicationLoadSources(serverLoadPB.getReplLoadSourceList().stream()
.map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReplicationLoadSink(serverLoadPB.hasReplLoadSink()
@@ -100,19 +102,19 @@ public final class ServerMetricsBuilder {
.setInfoServerPort(metrics.getInfoServerPort())
.setMaxHeapMB((int) metrics.getMaxHeapSize().get(Size.Unit.MEGABYTE))
.setUsedHeapMB((int) metrics.getUsedHeapSize().get(Size.Unit.MEGABYTE))
- .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames()))
- .addAllRegionLoads(metrics.getRegionMetrics().values().stream()
- .map(RegionMetricsBuilder::toRegionLoad)
- .collect(Collectors.toList()))
- .addAllReplLoadSource(metrics.getReplicationLoadSourceList().stream()
- .map(ProtobufUtil::toReplicationLoadSource)
- .collect(Collectors.toList()))
+ .addAllCoprocessors(toCoprocessor(metrics.getCoprocessorNames())).addAllRegionLoads(
+ metrics.getRegionMetrics().values().stream().map(RegionMetricsBuilder::toRegionLoad)
+ .collect(Collectors.toList())).addAllUserLoads(
+ metrics.getUserMetrics().values().stream().map(UserMetricsBuilder::toUserMetrics)
+ .collect(Collectors.toList())).addAllReplLoadSource(
+ metrics.getReplicationLoadSourceList().stream()
+ .map(ProtobufUtil::toReplicationLoadSource).collect(Collectors.toList()))
.setReportStartTime(metrics.getLastReportTimestamp())
.setReportEndTime(metrics.getReportTimestamp());
if (metrics.getReplicationLoadSink() != null) {
- builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(
- metrics.getReplicationLoadSink()));
+ builder.setReplLoadSink(ProtobufUtil.toReplicationLoadSink(metrics.getReplicationLoadSink()));
}
+
return builder.build();
}
@@ -132,6 +134,7 @@ public final class ServerMetricsBuilder {
@Nullable
private ReplicationLoadSink sink = null;
private final Map<byte[], RegionMetrics> regionStatus = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ private final Map<byte[], UserMetrics> userMetrics = new TreeMap<>(Bytes.BYTES_COMPARATOR);
private final Set<String> coprocessorNames = new TreeSet<>();
private long reportTimestamp = System.currentTimeMillis();
private long lastReportTimestamp = 0;
@@ -189,6 +192,11 @@ public final class ServerMetricsBuilder {
return this;
}
+ public ServerMetricsBuilder setUserMetrics(List<UserMetrics> value) {
+ value.forEach(v -> this.userMetrics.put(v.getUserName(), v));
+ return this;
+ }
+
public ServerMetricsBuilder setCoprocessorNames(List<String> value) {
coprocessorNames.addAll(value);
return this;
@@ -219,7 +227,8 @@ public final class ServerMetricsBuilder {
regionStatus,
coprocessorNames,
reportTimestamp,
- lastReportTimestamp);
+ lastReportTimestamp,
+ userMetrics);
}
private static class ServerMetricsImpl implements ServerMetrics {
@@ -238,12 +247,13 @@ public final class ServerMetricsBuilder {
private final Set<String> coprocessorNames;
private final long reportTimestamp;
private final long lastReportTimestamp;
+ private final Map<byte[], UserMetrics> userMetrics;
ServerMetricsImpl(ServerName serverName, int versionNumber, String version,
long requestCountPerSecond, long requestCount, Size usedHeapSize, Size maxHeapSize,
int infoServerPort, List<ReplicationLoadSource> sources, ReplicationLoadSink sink,
Map<byte[], RegionMetrics> regionStatus, Set<String> coprocessorNames, long reportTimestamp,
- long lastReportTimestamp) {
+ long lastReportTimestamp, Map<byte[], UserMetrics> userMetrics) {
this.serverName = Preconditions.checkNotNull(serverName);
this.versionNumber = versionNumber;
this.version = version;
@@ -255,6 +265,7 @@ public final class ServerMetricsBuilder {
this.sources = Preconditions.checkNotNull(sources);
this.sink = sink;
this.regionStatus = Preconditions.checkNotNull(regionStatus);
+ this.userMetrics = Preconditions.checkNotNull(userMetrics);
this.coprocessorNames =Preconditions.checkNotNull(coprocessorNames);
this.reportTimestamp = reportTimestamp;
this.lastReportTimestamp = lastReportTimestamp;
@@ -325,6 +336,11 @@ public final class ServerMetricsBuilder {
}
@Override
+ public Map<byte[], UserMetrics> getUserMetrics() {
+ return Collections.unmodifiableMap(userMetrics);
+ }
+
+ @Override
public Set<String> getCoprocessorNames() {
return Collections.unmodifiableSet(coprocessorNames);
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java
new file mode 100644
index 0000000..6c2ba07
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetrics.java
@@ -0,0 +1,86 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Encapsulates per-user load metrics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface UserMetrics {
+
+ interface ClientMetrics {
+
+ String getHostName();
+
+ long getReadRequestsCount();
+
+ long getWriteRequestsCount();
+
+ long getFilteredReadRequestsCount();
+ }
+
+ /**
+ * @return the user name
+ */
+ byte[] getUserName();
+
+ /**
+ * @return the number of read requests made by user
+ */
+ long getReadRequestCount();
+
+ /**
+ * @return the number of write requests made by user
+ */
+ long getWriteRequestCount();
+
+ /**
+ * @return the number of write requests and read requests and coprocessor
+ * service requests made by the user
+ */
+ default long getRequestCount() {
+ return getReadRequestCount() + getWriteRequestCount();
+ }
+
+ /**
+ * @return the user name as a string
+ */
+ default String getNameAsString() {
+ return Bytes.toStringBinary(getUserName());
+ }
+
+ /**
+ * @return metrics per client(hostname)
+ */
+ Map<String, ClientMetrics> getClientMetrics();
+
+ /**
+ * @return count of filtered read requests for a user
+ */
+ long getFilteredReadRequests();
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java
new file mode 100644
index 0000000..70d2888
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/UserMetricsBuilder.java
@@ -0,0 +1,151 @@
+/**
+ * Copyright The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Strings;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
+
+@InterfaceAudience.Private
+public final class UserMetricsBuilder {
+
+ public static UserMetrics toUserMetrics(ClusterStatusProtos.UserLoad userLoad) {
+ UserMetricsBuilder builder = UserMetricsBuilder.newBuilder(userLoad.getUserName().getBytes());
+ userLoad.getClientMetricsList().stream().map(
+ clientMetrics -> new ClientMetricsImpl(clientMetrics.getHostName(),
+ clientMetrics.getReadRequestsCount(), clientMetrics.getWriteRequestsCount(),
+ clientMetrics.getFilteredRequestsCount())).forEach(builder::addClientMetris);
+ return builder.build();
+ }
+
+ public static ClusterStatusProtos.UserLoad toUserMetrics(UserMetrics userMetrics) {
+ ClusterStatusProtos.UserLoad.Builder builder =
+ ClusterStatusProtos.UserLoad.newBuilder().setUserName(userMetrics.getNameAsString());
+ userMetrics.getClientMetrics().values().stream().map(
+ clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
+ .setHostName(clientMetrics.getHostName())
+ .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
+ .setReadRequestsCount(clientMetrics.getReadRequestsCount())
+ .setFilteredRequestsCount(clientMetrics.getFilteredReadRequestsCount()).build())
+ .forEach(builder::addClientMetrics);
+ return builder.build();
+ }
+
+ public static UserMetricsBuilder newBuilder(byte[] name) {
+ return new UserMetricsBuilder(name);
+ }
+
+
+ private final byte[] name;
+ private Map<String, UserMetrics.ClientMetrics> clientMetricsMap = new HashMap<>();
+ private UserMetricsBuilder(byte[] name) {
+ this.name = name;
+ }
+
+ public UserMetricsBuilder addClientMetris(UserMetrics.ClientMetrics clientMetrics) {
+ clientMetricsMap.put(clientMetrics.getHostName(), clientMetrics);
+ return this;
+ }
+
+ public UserMetrics build() {
+ return new UserMetricsImpl(name, clientMetricsMap);
+ }
+
+ public static class ClientMetricsImpl implements UserMetrics.ClientMetrics {
+ private final long filteredReadRequestsCount;
+ private final String hostName;
+ private final long readRequestCount;
+ private final long writeRequestCount;
+
+ public ClientMetricsImpl(String hostName, long readRequest, long writeRequest,
+ long filteredReadRequestsCount) {
+ this.hostName = hostName;
+ this.readRequestCount = readRequest;
+ this.writeRequestCount = writeRequest;
+ this.filteredReadRequestsCount = filteredReadRequestsCount;
+ }
+
+ @Override public String getHostName() {
+ return hostName;
+ }
+
+ @Override public long getReadRequestsCount() {
+ return readRequestCount;
+ }
+
+ @Override public long getWriteRequestsCount() {
+ return writeRequestCount;
+ }
+
+ @Override public long getFilteredReadRequestsCount() {
+ return filteredReadRequestsCount;
+ }
+ }
+
+ private static class UserMetricsImpl implements UserMetrics {
+ private final byte[] name;
+ private final Map<String, ClientMetrics> clientMetricsMap;
+
+ UserMetricsImpl(byte[] name, Map<String, ClientMetrics> clientMetricsMap) {
+ this.name = Preconditions.checkNotNull(name);
+ this.clientMetricsMap = clientMetricsMap;
+ }
+
+ @Override public byte[] getUserName() {
+ return name;
+ }
+
+ @Override public long getReadRequestCount() {
+ return clientMetricsMap.values().stream().map(c -> c.getReadRequestsCount())
+ .reduce(0L, Long::sum);
+ }
+
+ @Override public long getWriteRequestCount() {
+ return clientMetricsMap.values().stream().map(c -> c.getWriteRequestsCount())
+ .reduce(0L, Long::sum);
+ }
+
+ @Override public Map<String, ClientMetrics> getClientMetrics() {
+ return this.clientMetricsMap;
+ }
+
+ @Override public long getFilteredReadRequests() {
+ return clientMetricsMap.values().stream().map(c -> c.getFilteredReadRequestsCount())
+ .reduce(0L, Long::sum);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = Strings
+ .appendKeyValue(new StringBuilder(), "readRequestCount", this.getReadRequestCount());
+ Strings.appendKeyValue(sb, "writeRequestCount", this.getWriteRequestCount());
+ Strings.appendKeyValue(sb, "filteredReadRequestCount", this.getFilteredReadRequests());
+ return sb.toString();
+ }
+ }
+
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
index 0ffb928..ee570f0 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSource.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
+import java.util.Map;
+
import org.apache.hadoop.hbase.metrics.BaseSource;
import org.apache.yetus.audience.InterfaceAudience;
@@ -59,4 +61,6 @@ public interface MetricsUserAggregateSource extends BaseSource {
MetricsUserSource getOrCreateMetricsUser(String user);
void deregister(MetricsUserSource toRemove);
+
+ Map<String, MetricsUserSource> getUserSources();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
index b20dca6..9617366 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSource.java
@@ -18,11 +18,31 @@
package org.apache.hadoop.hbase.regionserver;
+import java.util.Map;
+
+import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsUserSource extends Comparable<MetricsUserSource> {
+ //These client metrics will be reported through clusterStatus and hbtop only
+ interface ClientMetrics {
+ void incrementReadRequest();
+
+ void incrementWriteRequest();
+
+ String getHostName();
+
+ long getReadRequestsCount();
+
+ long getWriteRequestsCount();
+
+ void incrementFilteredReadRequests();
+
+ long getFilteredReadRequests();
+ }
+
String getUser();
void register();
@@ -42,4 +62,21 @@ public interface MetricsUserSource extends Comparable<MetricsUserSource> {
void updateReplay(long t);
void updateScanTime(long t);
+
+ void getMetrics(MetricsCollector metricsCollector, boolean all);
+
+ /**
+ * Metrics collected at client level for a user(needed for reporting through clusterStatus
+ * and hbtop currently)
+ * @return metrics per hostname
+ */
+ Map<String, ClientMetrics> getClientMetrics();
+
+ /**
+ * Create a instance of ClientMetrics if not present otherwise return the previous one
+ *
+ * @param hostName hostname of the client
+ * @return Instance of ClientMetrics
+ */
+ ClientMetrics getOrCreateMetricsClient(String hostName);
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
index 835c50b..bc1e8cb 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/metrics2/MetricHistogram.java
@@ -47,4 +47,10 @@ public interface MetricHistogram {
*/
void add(long value);
+ /**
+ * Return the total number of values added to the histogram.
+ * @return the total number of values.
+ */
+ long getCount();
+
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
index c447f40..28726c4 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateSourceImpl.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
@@ -28,8 +30,6 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
@InterfaceAudience.Private
public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
implements MetricsUserAggregateSource {
@@ -90,9 +90,9 @@ public class MetricsUserAggregateSourceImpl extends BaseSourceImpl
}
}
- @VisibleForTesting
- public ConcurrentHashMap<String, MetricsUserSource> getUserSources() {
- return userSources;
+ @Override
+ public Map<String, MetricsUserSource> getUserSources() {
+ return Collections.unmodifiableMap(userSources);
}
@Override
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
index 9f714a3..ef0eb7b 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserSourceImpl.java
@@ -18,9 +18,14 @@
package org.apache.hadoop.hbase.regionserver;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.metrics2.MetricHistogram;
+import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DynamicMetricsRegistry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +62,48 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
private final MetricsUserAggregateSourceImpl agg;
private final DynamicMetricsRegistry registry;
+ private ConcurrentHashMap<String, ClientMetrics> clientMetricsMap;
+
+ static class ClientMetricsImpl implements ClientMetrics {
+ private final String hostName;
+ final LongAdder readRequestsCount = new LongAdder();
+ final LongAdder writeRequestsCount = new LongAdder();
+ final LongAdder filteredRequestsCount = new LongAdder();
+
+ public ClientMetricsImpl(String hostName) {
+ this.hostName = hostName;
+ }
+
+ @Override public void incrementReadRequest() {
+ readRequestsCount.increment();
+ }
+
+ @Override public void incrementWriteRequest() {
+ writeRequestsCount.increment();
+ }
+
+ @Override public String getHostName() {
+ return hostName;
+ }
+
+ @Override public long getReadRequestsCount() {
+ return readRequestsCount.sum();
+ }
+
+ @Override public long getWriteRequestsCount() {
+ return writeRequestsCount.sum();
+ }
+
+ @Override public void incrementFilteredReadRequests() {
+ filteredRequestsCount.increment();
+
+ }
+
+ @Override public long getFilteredReadRequests() {
+ return filteredRequestsCount.sum();
+ }
+ }
+
public MetricsUserSourceImpl(String user, MetricsUserAggregateSourceImpl agg) {
if (LOG.isDebugEnabled()) {
LOG.debug("Creating new MetricsUserSourceImpl for user " + user);
@@ -77,7 +124,7 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
userIncrementKey = userNamePrefix + MetricsRegionServerSource.INCREMENT_KEY;
userAppendKey = userNamePrefix + MetricsRegionServerSource.APPEND_KEY;
userReplayKey = userNamePrefix + MetricsRegionServerSource.REPLAY_KEY;
-
+ clientMetricsMap = new ConcurrentHashMap<>();
agg.register(this);
}
@@ -204,4 +251,27 @@ public class MetricsUserSourceImpl implements MetricsUserSource {
public void updateScanTime(long t) {
scanTimeHisto.add(t);
}
+
+ @Override public void getMetrics(MetricsCollector metricsCollector, boolean all) {
+ MetricsRecordBuilder mrb = metricsCollector.addRecord(this.userNamePrefix);
+ registry.snapshot(mrb, all);
+ }
+
+ @Override public Map<String, ClientMetrics> getClientMetrics() {
+ return Collections.unmodifiableMap(clientMetricsMap);
+ }
+
+ @Override public ClientMetrics getOrCreateMetricsClient(String client) {
+ ClientMetrics source = clientMetricsMap.get(client);
+ if (source != null) {
+ return source;
+ }
+ source = new ClientMetricsImpl(client);
+ ClientMetrics prev = clientMetricsMap.putIfAbsent(client, source);
+ if (prev != null) {
+ return prev;
+ }
+ return source;
+ }
+
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
index 75dc300..dc86ebe 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableHistogram.java
@@ -51,6 +51,10 @@ public class MutableHistogram extends MutableMetric implements MetricHistogram {
histogram.update(val);
}
+ @Override public long getCount() {
+ return histogram.getCount();
+ }
+
public long getMax() {
return histogram.getMax();
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
index 273154f..4a406cc 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/metrics2/lib/MutableRangeHistogram.java
@@ -83,5 +83,9 @@ public abstract class MutableRangeHistogram extends MutableHistogram implements
Interns.info(name + "_" + rangeType + "_" + ranges[ranges.length - 1] + "-inf", desc),
val - cumNum);
}
- }
+ }
+
+ @Override public long getCount() {
+ return histogram.getCount();
+ }
}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
index aaef965..c7093dd 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/RecordFilter.java
@@ -143,6 +143,10 @@ public final class RecordFilter {
this.value = Objects.requireNonNull(value);
}
+ public Field getField() {
+ return field;
+ }
+
public boolean execute(Record record) {
FieldValue fieldValue = record.get(field);
if (fieldValue == null) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
index 6e5f66f..df460dd 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/field/Field.java
@@ -57,7 +57,11 @@ public enum Field {
FieldValueType.STRING),
REGION_COUNT("#REGION", "Region Count", false, false, FieldValueType.INTEGER),
USED_HEAP_SIZE("UHEAP", "Used Heap Size", false, false, FieldValueType.SIZE),
- MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE);
+ USER("USER", "user Name", true, true, FieldValueType.STRING),
+ MAX_HEAP_SIZE("MHEAP", "Max Heap Size", false, false, FieldValueType.SIZE),
+ CLIENT_COUNT("#CLIENT", "Client Count", false, false, FieldValueType.INTEGER),
+ USER_COUNT("#USER", "User Count", false, false, FieldValueType.INTEGER),
+ CLIENT("CLIENT", "Client Hostname", true, true, FieldValueType.STRING);
private final String header;
private final String description;
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java
new file mode 100644
index 0000000..fe3edd1
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeStrategy.java
@@ -0,0 +1,157 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.ServerMetrics;
+import org.apache.hadoop.hbase.UserMetrics;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
+import org.apache.hadoop.hbase.hbtop.field.FieldValue;
+import org.apache.hadoop.hbase.hbtop.field.FieldValueType;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation for {@link ModeStrategy} for client Mode.
+ */
+@InterfaceAudience.Private public final class ClientModeStrategy implements ModeStrategy {
+
+ private final List<FieldInfo> fieldInfos = Arrays
+ .asList(new FieldInfo(Field.CLIENT, 0, true),
+ new FieldInfo(Field.USER_COUNT, 5, true),
+ new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
+ private final Map<String, RequestCountPerSecond> requestCountPerSecondMap = new HashMap<>();
+
+ ClientModeStrategy() {
+ }
+
+ @Override public List<FieldInfo> getFieldInfos() {
+ return fieldInfos;
+ }
+
+ @Override public Field getDefaultSortField() {
+ return Field.REQUEST_COUNT_PER_SECOND;
+ }
+
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
+ List<Record> records = createRecords(clusterMetrics);
+ return aggregateRecordsAndAddDistinct(
+ ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.CLIENT, Field.USER,
+ Field.USER_COUNT);
+ }
+
+ List<Record> createRecords(ClusterMetrics clusterMetrics) {
+ List<Record> ret = new ArrayList<>();
+ for (ServerMetrics serverMetrics : clusterMetrics.getLiveServerMetrics().values()) {
+ long lastReportTimestamp = serverMetrics.getLastReportTimestamp();
+ serverMetrics.getUserMetrics().values().forEach(um -> um.getClientMetrics().values().forEach(
+ clientMetrics -> ret.add(
+ createRecord(um.getNameAsString(), clientMetrics, lastReportTimestamp,
+ serverMetrics.getServerName().getServerName()))));
+ }
+ return ret;
+ }
+
+ /**
+ * Aggregate the records and count the unique values for the given distinctField
+ *
+ * @param records records to be processed
+ * @param groupBy Field on which group by needs to be done
+ * @param distinctField Field whose unique values needs to be counted
+ * @param uniqueCountAssignedTo a target field to which the unique count is assigned to
+ * @return aggregated records
+ */
+ List<Record> aggregateRecordsAndAddDistinct(List<Record> records, Field groupBy,
+ Field distinctField, Field uniqueCountAssignedTo) {
+ List<Record> result = new ArrayList<>();
+ records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).values()
+ .forEach(val -> {
+ Set<FieldValue> distinctValues = new HashSet<>();
+ Map<Field, FieldValue> map = new HashMap<>();
+ for (Record record : val) {
+ for (Map.Entry<Field, FieldValue> field : record.entrySet()) {
+ if (distinctField.equals(field.getKey())) {
+ //We will not be adding the field in the new record whose distinct count is required
+ distinctValues.add(record.get(distinctField));
+ } else {
+ if (field.getKey().getFieldValueType() == FieldValueType.STRING) {
+ map.put(field.getKey(), field.getValue());
+ } else {
+ if (map.get(field.getKey()) == null) {
+ map.put(field.getKey(), field.getValue());
+ } else {
+ map.put(field.getKey(), map.get(field.getKey()).plus(field.getValue()));
+ }
+ }
+ }
+ }
+ }
+ // Add unique count field
+ map.put(uniqueCountAssignedTo, uniqueCountAssignedTo.newValue(distinctValues.size()));
+ result.add(Record.ofEntries(map.entrySet().stream()
+ .map(k -> Record.entry(k.getKey(), k.getValue()))));
+ });
+ return result;
+ }
+
+ Record createRecord(String user, UserMetrics.ClientMetrics clientMetrics,
+ long lastReportTimestamp, String server) {
+ Record.Builder builder = Record.builder();
+ String client = clientMetrics.getHostName();
+ builder.put(Field.CLIENT, clientMetrics.getHostName());
+ String mapKey = client + "$" + user + "$" + server;
+ RequestCountPerSecond requestCountPerSecond = requestCountPerSecondMap.get(mapKey);
+ if (requestCountPerSecond == null) {
+ requestCountPerSecond = new RequestCountPerSecond();
+ requestCountPerSecondMap.put(mapKey, requestCountPerSecond);
+ }
+ requestCountPerSecond.refresh(lastReportTimestamp, clientMetrics.getReadRequestsCount(),
+ clientMetrics.getFilteredReadRequestsCount(), clientMetrics.getWriteRequestsCount());
+ builder.put(Field.REQUEST_COUNT_PER_SECOND, requestCountPerSecond.getRequestCountPerSecond());
+ builder.put(Field.READ_REQUEST_COUNT_PER_SECOND,
+ requestCountPerSecond.getReadRequestCountPerSecond());
+ builder.put(Field.WRITE_REQUEST_COUNT_PER_SECOND,
+ requestCountPerSecond.getWriteRequestCountPerSecond());
+ builder.put(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND,
+ requestCountPerSecond.getFilteredReadRequestCountPerSecond());
+ builder.put(Field.USER, user);
+ return builder.build();
+ }
+
+ @Override public DrillDownInfo drillDown(Record selectedRecord) {
+ List<RecordFilter> initialFilters = Collections.singletonList(
+ RecordFilter.newBuilder(Field.CLIENT).doubleEquals(selectedRecord.get(Field.CLIENT)));
+ return new DrillDownInfo(Mode.USER, initialFilters);
+ }
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
index 1290e69..ffd98df 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/Mode.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Objects;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.yetus.audience.InterfaceAudience;
@@ -35,7 +36,9 @@ public enum Mode {
NAMESPACE("Namespace", "Record per Namespace", new NamespaceModeStrategy()),
TABLE("Table", "Record per Table", new TableModeStrategy()),
REGION("Region", "Record per Region", new RegionModeStrategy()),
- REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy());
+ REGION_SERVER("RegionServer", "Record per RegionServer", new RegionServerModeStrategy()),
+ USER("User", "Record per user", new UserModeStrategy()),
+ CLIENT("Client", "Record per client", new ClientModeStrategy());
private final String header;
private final String description;
@@ -55,8 +58,9 @@ public enum Mode {
return description;
}
- public List<Record> getRecords(ClusterMetrics clusterMetrics) {
- return modeStrategy.getRecords(clusterMetrics);
+ public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
+ return modeStrategy.getRecords(clusterMetrics, pushDownFilters);
}
public List<FieldInfo> getFieldInfos() {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
index 09fa297..021cee2 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategy.java
@@ -21,6 +21,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.List;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.yetus.audience.InterfaceAudience;
@@ -33,6 +34,6 @@ import org.apache.yetus.audience.InterfaceAudience;
interface ModeStrategy {
List<FieldInfo> getFieldInfos();
Field getDefaultSortField();
- List<Record> getRecords(ClusterMetrics clusterMetrics);
+ List<Record> getRecords(ClusterMetrics clusterMetrics, List<RecordFilter> pushDownFilters);
@Nullable DrillDownInfo drillDown(Record selectedRecord);
}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java
new file mode 100644
index 0000000..9175820
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/ModeStrategyUtils.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+
+public final class ModeStrategyUtils {
+ private ModeStrategyUtils() {
+
+ }
+
+ /**
+ * Filter records as per the supplied filters,
+ * @param records records to be processed
+ * @param filters List of filters
+ * @return filtered records
+ */
+ public static List<Record> applyFilterAndGet(List<Record> records,
+ List<RecordFilter> filters) {
+ if (filters != null && !filters.isEmpty()) {
+ return records.stream().filter(r -> filters.stream().allMatch(f -> f.execute(r)))
+ .collect(Collectors.toList());
+ }
+ return records;
+ }
+
+
+ /**
+ * Group by records on the basis of supplied groupBy field and
+ * Aggregate records using {@link Record#combine(Record)}
+ *
+ * @param records records needs to be processed
+ * @param groupBy Field to be used for group by
+ * @return aggregated records
+ */
+ public static List<Record> aggregateRecords(List<Record> records, Field groupBy) {
+ return records.stream().collect(Collectors.groupingBy(r -> r.get(groupBy))).entrySet().stream()
+ .flatMap(e -> e.getValue().stream().reduce(Record::combine).map(Stream::of)
+ .orElse(Stream.empty())).collect(Collectors.toList());
+ }
+
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
index 866f57e..f74d8bf 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/NamespaceModeStrategy.java
@@ -20,8 +20,7 @@ package org.apache.hadoop.hbase.hbtop.mode;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
import org.apache.hadoop.hbase.hbtop.RecordFilter;
@@ -64,27 +63,14 @@ public final class NamespaceModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
- @Override
- public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
- List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
- .map(record ->
- Record.ofEntries(fieldInfos.stream()
- .filter(fi -> record.containsKey(fi.getField()))
- .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
- .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
- .collect(Collectors.toList());
+ List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+ regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by NAMESPACE field
- return records.stream()
- .collect(Collectors.groupingBy(r -> r.get(Field.NAMESPACE).asString()))
- .entrySet().stream()
- .flatMap(
- e -> e.getValue().stream()
- .reduce(Record::combine)
- .map(Stream::of)
- .orElse(Stream.empty()))
- .collect(Collectors.toList());
+ return ModeStrategyUtils.aggregateRecords(records, Field.NAMESPACE);
}
@Override
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
index e5deda0..0adbc82 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionModeStrategy.java
@@ -24,6 +24,8 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
+
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.RegionMetrics;
@@ -31,6 +33,7 @@ import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.hadoop.hbase.util.Bytes;
@@ -83,8 +86,8 @@ public final class RegionModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
- @Override
- public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
List<Record> ret = new ArrayList<>();
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
long lastReportTimestamp = sm.getLastReportTimestamp();
@@ -174,6 +177,27 @@ public final class RegionModeStrategy implements ModeStrategy {
return builder.build();
}
+ /**
+ * Form new record list with records formed by only fields provided through fieldInfo and
+ * add a count field for each record with value 1
+ * We are doing two operation of selecting and adding new field
+ * because of saving some CPU cycles on rebuilding the record again
+ *
+ * @param fieldInfos List of FieldInfos required in the record
+ * @param records List of records which needs to be processed
+ * @param countField Field which needs to be added with value 1 for each record
+ * @return records after selecting required fields and adding count field
+ */
+ List<Record> selectModeFieldsAndAddCountField(List<FieldInfo> fieldInfos, List<Record> records,
+ Field countField) {
+
+ return records.stream().map(record -> Record.ofEntries(
+ fieldInfos.stream().filter(fi -> record.containsKey(fi.getField()))
+ .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
+ .map(record -> Record.builder().putAll(record).put(countField, 1).build())
+ .collect(Collectors.toList());
+ }
+
@Nullable
@Override
public DrillDownInfo drillDown(Record selectedRecord) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
index d64f713..44a9a2c 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/RegionServerModeStrategy.java
@@ -23,7 +23,7 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.Stream;
+
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ServerMetrics;
import org.apache.hadoop.hbase.hbtop.Record;
@@ -70,27 +70,15 @@ public final class RegionServerModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
- @Override
- public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
- List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
- .map(record ->
- Record.ofEntries(fieldInfos.stream()
- .filter(fi -> record.containsKey(fi.getField()))
- .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
- .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
- .collect(Collectors.toList());
-
+ List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+ regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by LONG_REGION_SERVER field
- Map<String, Record> retMap = records.stream()
- .collect(Collectors.groupingBy(r -> r.get(Field.LONG_REGION_SERVER).asString()))
- .entrySet().stream()
- .flatMap(
- e -> e.getValue().stream()
- .reduce(Record::combine)
- .map(Stream::of)
- .orElse(Stream.empty()))
- .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
+ Map<String, Record> retMap =
+ ModeStrategyUtils.aggregateRecords(records, Field.LONG_REGION_SERVER).stream()
+ .collect(Collectors.toMap(r -> r.get(Field.LONG_REGION_SERVER).asString(), r -> r));
// Add USED_HEAP_SIZE field and MAX_HEAP_SIZE field
for (ServerMetrics sm : clusterMetrics.getLiveServerMetrics().values()) {
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
index 1da074f..4acc344 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/TableModeStrategy.java
@@ -65,16 +65,11 @@ public final class TableModeStrategy implements ModeStrategy {
return Field.REQUEST_COUNT_PER_SECOND;
}
- @Override
- public List<Record> getRecords(ClusterMetrics clusterMetrics) {
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
// Get records from RegionModeStrategy and add REGION_COUNT field
- List<Record> records = regionModeStrategy.getRecords(clusterMetrics).stream()
- .map(record ->
- Record.ofEntries(fieldInfos.stream()
- .filter(fi -> record.containsKey(fi.getField()))
- .map(fi -> Record.entry(fi.getField(), record.get(fi.getField())))))
- .map(record -> Record.builder().putAll(record).put(Field.REGION_COUNT, 1).build())
- .collect(Collectors.toList());
+ List<Record> records = regionModeStrategy.selectModeFieldsAndAddCountField(fieldInfos,
+ regionModeStrategy.getRecords(clusterMetrics, pushDownFilters), Field.REGION_COUNT);
// Aggregation by NAMESPACE field and TABLE field
return records.stream()
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java
new file mode 100644
index 0000000..605376e
--- /dev/null
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/mode/UserModeStrategy.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.hbase.ClusterMetrics;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Implementation for {@link ModeStrategy} for User Mode.
+ */
+@InterfaceAudience.Private public final class UserModeStrategy implements ModeStrategy {
+
+ private final List<FieldInfo> fieldInfos = Arrays
+ .asList(new FieldInfo(Field.USER, 0, true),
+ new FieldInfo(Field.CLIENT_COUNT, 7, true),
+ new FieldInfo(Field.REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.READ_REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.WRITE_REQUEST_COUNT_PER_SECOND, 10, true),
+ new FieldInfo(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND, 10, true));
+ private final ClientModeStrategy clientModeStrategy = new ClientModeStrategy();
+
+ UserModeStrategy() {
+ }
+
+ @Override public List<FieldInfo> getFieldInfos() {
+ return fieldInfos;
+ }
+
+ @Override public Field getDefaultSortField() {
+ return Field.REQUEST_COUNT_PER_SECOND;
+ }
+
+ @Override public List<Record> getRecords(ClusterMetrics clusterMetrics,
+ List<RecordFilter> pushDownFilters) {
+ List<Record> records = clientModeStrategy.createRecords(clusterMetrics);
+ return clientModeStrategy.aggregateRecordsAndAddDistinct(
+ ModeStrategyUtils.applyFilterAndGet(records, pushDownFilters), Field.USER, Field.CLIENT,
+ Field.CLIENT_COUNT);
+ }
+
+ @Override public DrillDownInfo drillDown(Record selectedRecord) {
+ //Drill down to client and using selected USER as a filter
+ List<RecordFilter> initialFilters = Collections.singletonList(
+ RecordFilter.newBuilder(Field.USER).doubleEquals(selectedRecord.get(Field.USER)));
+ return new DrillDownInfo(Mode.CLIENT, initialFilters);
+ }
+}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
index 42e81e1..3dd7f12 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenModel.java
@@ -17,12 +17,15 @@
*/
package org.apache.hadoop.hbase.hbtop.screen.top;
+import static org.apache.commons.lang3.time.DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT;
+
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
-import org.apache.commons.lang3.time.DateFormatUtils;
+
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.hbtop.Record;
@@ -56,6 +59,7 @@ public class TopScreenModel {
private List<Record> records;
private final List<RecordFilter> filters = new ArrayList<>();
+ private final List<RecordFilter> pushDownFilters = new ArrayList<>();
private final List<String> filterHistories = new ArrayList<>();
private boolean ascendingSort;
@@ -88,6 +92,7 @@ public class TopScreenModel {
if (initialFilters != null) {
filters.addAll(initialFilters);
}
+ decomposePushDownFilter();
}
public void setSortFieldAndFields(Field sortField, List<Field> fields) {
@@ -113,7 +118,7 @@ public class TopScreenModel {
}
private void refreshSummary(ClusterMetrics clusterMetrics) {
- String currentTime = DateFormatUtils.ISO_8601_EXTENDED_TIME_FORMAT
+ String currentTime = ISO_8601_EXTENDED_TIME_FORMAT
.format(System.currentTimeMillis());
String version = clusterMetrics.getHBaseVersion();
String clusterId = clusterMetrics.getClusterId();
@@ -130,7 +135,7 @@ public class TopScreenModel {
}
private void refreshRecords(ClusterMetrics clusterMetrics) {
- List<Record> records = currentMode.getRecords(clusterMetrics);
+ List<Record> records = currentMode.getRecords(clusterMetrics, pushDownFilters);
// Filter and sort
records = records.stream()
@@ -153,13 +158,13 @@ public class TopScreenModel {
if (filter == null) {
return false;
}
-
filters.add(filter);
filterHistories.add(filterString);
return true;
}
public void clearFilters() {
+ pushDownFilters.clear();
filters.clear();
}
@@ -203,4 +208,18 @@ public class TopScreenModel {
public List<String> getFilterHistories() {
return Collections.unmodifiableList(filterHistories);
}
+
+ private void decomposePushDownFilter() {
+ pushDownFilters.clear();
+ for (RecordFilter filter : filters) {
+ if (!fields.contains(filter.getField())) {
+ pushDownFilters.add(filter);
+ }
+ }
+ filters.removeAll(pushDownFilters);
+ }
+
+ public Collection<? extends RecordFilter> getPushDownFilters() {
+ return pushDownFilters;
+ }
}
diff --git a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
index d435f5c..e4cd386 100644
--- a/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
+++ b/hbase-hbtop/src/main/java/org/apache/hadoop/hbase/hbtop/screen/top/TopScreenPresenter.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.RecordFilter;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.field.FieldInfo;
import org.apache.hadoop.hbase.hbtop.mode.Mode;
@@ -324,7 +325,8 @@ public class TopScreenPresenter {
}
public ScreenView goToFilterDisplayMode(Screen screen, Terminal terminal, int row) {
- return new FilterDisplayModeScreenView(screen, terminal, row, topScreenModel.getFilters(),
- topScreenView);
+ ArrayList<RecordFilter> filters = new ArrayList<>(topScreenModel.getFilters());
+ filters.addAll(topScreenModel.getPushDownFilters());
+ return new FilterDisplayModeScreenView(screen, terminal, row, filters, topScreenView);
}
}
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
index 43a8447..bad2a00 100644
--- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/TestUtils.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.commons.lang3.time.FastDateFormat;
import org.apache.hadoop.hbase.ClusterMetrics;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@@ -37,6 +38,8 @@ import org.apache.hadoop.hbase.ServerMetricsBuilder;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UserMetrics;
+import org.apache.hadoop.hbase.UserMetricsBuilder;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.hbtop.field.Field;
import org.apache.hadoop.hbase.hbtop.screen.top.Summary;
@@ -54,6 +57,9 @@ public final class TestUtils {
// host1
List<RegionMetrics> regionMetricsList = new ArrayList<>();
+ List<UserMetrics> userMetricsList = new ArrayList<>();
+ userMetricsList.add(createUserMetrics("FOO",1,2, 4));
+ userMetricsList.add(createUserMetrics("BAR",2,3, 3));
regionMetricsList.add(createRegionMetrics(
"table1,,1.00000000000000000000000000000000.",
100, 50, 100,
@@ -73,10 +79,13 @@ public final class TestUtils {
ServerName host1 = ServerName.valueOf("host1.apache.com", 1000, 1);
serverMetricsMap.put(host1, createServerMetrics(host1, 100,
new Size(100, Size.Unit.MEGABYTE), new Size(200, Size.Unit.MEGABYTE), 100,
- regionMetricsList));
+ regionMetricsList, userMetricsList));
// host2
regionMetricsList.clear();
+ userMetricsList.clear();
+ userMetricsList.add(createUserMetrics("FOO",5,7, 3));
+ userMetricsList.add(createUserMetrics("BAR",4,8, 4));
regionMetricsList.add(createRegionMetrics(
"table1,1,4.00000000000000000000000000000003.",
100, 50, 100,
@@ -96,7 +105,7 @@ public final class TestUtils {
ServerName host2 = ServerName.valueOf("host2.apache.com", 1001, 2);
serverMetricsMap.put(host2, createServerMetrics(host2, 200,
new Size(16, Size.Unit.GIGABYTE), new Size(32, Size.Unit.GIGABYTE), 200,
- regionMetricsList));
+ regionMetricsList, userMetricsList));
ServerName host3 = ServerName.valueOf("host3.apache.com", 1002, 3);
return ClusterMetricsBuilder.newBuilder()
@@ -117,6 +126,15 @@ public final class TestUtils {
.build();
}
+ private static UserMetrics createUserMetrics(String user, long readRequestCount,
+ long writeRequestCount, long filteredReadRequestsCount) {
+ return UserMetricsBuilder.newBuilder(Bytes.toBytes(user)).addClientMetris(
+ new UserMetricsBuilder.ClientMetricsImpl("CLIENT_A_" + user, readRequestCount,
+ writeRequestCount, filteredReadRequestsCount)).addClientMetris(
+ new UserMetricsBuilder.ClientMetricsImpl("CLIENT_B_" + user, readRequestCount,
+ writeRequestCount, filteredReadRequestsCount)).build();
+ }
+
private static RegionMetrics createRegionMetrics(String regionName, long readRequestCount,
long filteredReadRequestCount, long writeRequestCount, Size storeFileSize,
Size uncompressedStoreFileSize, int storeFileCount, Size memStoreSize, float locality,
@@ -139,14 +157,15 @@ public final class TestUtils {
private static ServerMetrics createServerMetrics(ServerName serverName, long reportTimestamp,
Size usedHeapSize, Size maxHeapSize, long requestCountPerSecond,
- List<RegionMetrics> regionMetricsList) {
+ List<RegionMetrics> regionMetricsList, List<UserMetrics> userMetricsList) {
return ServerMetricsBuilder.newBuilder(serverName)
.setReportTimestamp(reportTimestamp)
.setUsedHeapSize(usedHeapSize)
.setMaxHeapSize(maxHeapSize)
.setRequestCountPerSecond(requestCountPerSecond)
- .setRegionMetrics(regionMetricsList).build();
+ .setRegionMetrics(regionMetricsList)
+ .setUserMetrics(userMetricsList).build();
}
public static void assertRecordsInRegionMode(List<Record> records) {
@@ -316,10 +335,78 @@ public final class TestUtils {
}
}
+ public static void assertRecordsInUserMode(List<Record> records) {
+ assertThat(records.size(), is(2));
+ for (Record record : records) {
+ String user = record.get(Field.USER).asString();
+ switch (user) {
+ //readRequestPerSecond and writeRequestPerSecond will be zero
+ // because there is no change or new metrics during refresh
+ case "FOO":
+ assertRecordInUserMode(record, 0L, 0L, 0L);
+ break;
+ case "BAR":
+ assertRecordInUserMode(record, 0L, 0L, 0L);
+ break;
+ default:
+ fail();
+ }
+ }
+ }
+
+ public static void assertRecordsInClientMode(List<Record> records) {
+ assertThat(records.size(), is(4));
+ for (Record record : records) {
+ String client = record.get(Field.CLIENT).asString();
+ switch (client) {
+ //readRequestPerSecond and writeRequestPerSecond will be zero
+ // because there is no change or new metrics during refresh
+ case "CLIENT_A_FOO":
+ assertRecordInClientMode(record, 0L, 0L, 0L);
+ break;
+ case "CLIENT_A_BAR":
+ assertRecordInClientMode(record, 0L, 0L, 0L);
+ break;
+ case "CLIENT_B_FOO":
+ assertRecordInClientMode(record, 0L, 0L, 0L);
+ break;
+ case "CLIENT_B_BAR":
+ assertRecordInClientMode(record, 0L, 0L, 0L);
+ break;
+ default:
+ fail();
+ }
+ }
+ }
+
+ private static void assertRecordInUserMode(Record record, long readRequestCountPerSecond,
+ long writeCountRequestPerSecond, long filteredReadRequestsCount) {
+ assertThat(record.size(), is(6));
+ assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(readRequestCountPerSecond));
+ assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(writeCountRequestPerSecond));
+ assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(filteredReadRequestsCount));
+ assertThat(record.get(Field.CLIENT_COUNT).asInt(), is(2));
+ }
+
+ private static void assertRecordInClientMode(Record record, long readRequestCountPerSecond,
+ long writeCountRequestPerSecond, long filteredReadRequestsCount) {
+ assertThat(record.size(), is(6));
+ assertThat(record.get(Field.READ_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(readRequestCountPerSecond));
+ assertThat(record.get(Field.WRITE_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(writeCountRequestPerSecond));
+ assertThat(record.get(Field.FILTERED_READ_REQUEST_COUNT_PER_SECOND).asLong(),
+ is(filteredReadRequestsCount));
+ assertThat(record.get(Field.USER_COUNT).asInt(), is(1));
+ }
+
private static void assertRecordInTableMode(Record record, long requestCountPerSecond,
- long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
- long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
- int numStoreFiles, Size memStoreSize, int regionCount) {
+ long readRequestCountPerSecond, long filteredReadRequestCountPerSecond,
+ long writeCountRequestPerSecond, Size storeFileSize, Size uncompressedStoreFileSize,
+ int numStoreFiles, Size memStoreSize, int regionCount) {
assertThat(record.size(), is(11));
assertThat(record.get(Field.REQUEST_COUNT_PER_SECOND).asLong(),
is(requestCountPerSecond));
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java
new file mode 100644
index 0000000..82dbe45
--- /dev/null
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ClientModeTest.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.TestUtils;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class ClientModeTest extends ModeTestBase {
+
+ @ClassRule public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(ClientModeTest.class);
+
+ @Override protected Mode getMode() {
+ return Mode.CLIENT;
+ }
+
+ @Override protected void assertRecords(List<Record> records) {
+ TestUtils.assertRecordsInClientMode(records);
+ }
+
+ @Override protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
+ assertThat(drillDownInfo.getNextMode(), is(Mode.USER));
+ assertThat(drillDownInfo.getInitialFilters().size(), is(1));
+ String client = currentRecord.get(Field.CLIENT).asString();
+ switch (client) {
+ case "CLIENT_A_FOO":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_FOO"));
+ break;
+
+ case "CLIENT_B_FOO":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_FOO"));
+ break;
+
+ case "CLIENT_A_BAR":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_A_BAR"));
+ break;
+ case "CLIENT_B_BAR":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("CLIENT==CLIENT_B_BAR"));
+ break;
+
+ default:
+ fail();
+ }
+ }
+}
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
index 7ad1a3a..2b6db84 100644
--- a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/ModeTestBase.java
@@ -27,7 +27,8 @@ public abstract class ModeTestBase {
@Test
public void testGetRecords() {
- List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
+ List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
+ null);
assertRecords(records);
}
@@ -36,7 +37,8 @@ public abstract class ModeTestBase {
@Test
public void testDrillDown() {
- List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics());
+ List<Record> records = getMode().getRecords(TestUtils.createDummyClusterMetrics(),
+ null);
for (Record record : records) {
assertDrillDown(record, getMode().drillDown(record));
}
diff --git a/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java
new file mode 100644
index 0000000..32e111c
--- /dev/null
+++ b/hbase-hbtop/src/test/java/org/apache/hadoop/hbase/hbtop/mode/UserModeTest.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.hbtop.mode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.hbtop.Record;
+import org.apache.hadoop.hbase.hbtop.TestUtils;
+import org.apache.hadoop.hbase.hbtop.field.Field;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class UserModeTest extends ModeTestBase {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(UserModeTest.class);
+
+ @Override
+ protected Mode getMode() {
+ return Mode.USER;
+ }
+
+ @Override
+ protected void assertRecords(List<Record> records) {
+ TestUtils.assertRecordsInUserMode(records);
+ }
+
+ @Override
+ protected void assertDrillDown(Record currentRecord, DrillDownInfo drillDownInfo) {
+ assertThat(drillDownInfo.getNextMode(), is(Mode.CLIENT));
+ assertThat(drillDownInfo.getInitialFilters().size(), is(1));
+ String userName = currentRecord.get(Field.USER).asString();
+
+ switch (userName) {
+ case "FOO":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==FOO"));
+ break;
+
+ case "BAR":
+ assertThat(drillDownInfo.getInitialFilters().get(0).toString(), is("USER==BAR"));
+ break;
+
+ default:
+ fail();
+ }
+ }
+}
diff --git a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
index 78d2e83..77b9887 100644
--- a/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/ClusterStatus.proto
@@ -157,6 +157,29 @@ message RegionLoad {
optional int32 max_store_file_ref_count = 22 [default = 0];
}
+message UserLoad {
+
+ /** short user name */
+ required string userName = 1;
+
+ /** Metrics for all clients of a user */
+ repeated ClientMetrics clientMetrics = 2;
+}
+
+message ClientMetrics {
+ /** client host name */
+ required string hostName = 1;
+
+ /** the current total read requests made from a client */
+ optional uint64 read_requests_count = 2;
+
+ /** the current total write requests made from a client */
+ optional uint64 write_requests_count = 3;
+
+ /** the current total filtered requests made from a client */
+ optional uint64 filtered_requests_count = 4;
+}
+
/* Server-level protobufs */
message ReplicationLoadSink {
@@ -230,6 +253,11 @@ message ServerLoad {
* The replicationLoadSink for the replication Sink status of this region server.
*/
optional ReplicationLoadSink replLoadSink = 11;
+
+ /**
+ * The metrics for each user on this region server
+ */
+ repeated UserLoad userLoads = 12;
}
message LiveServerInfo {
diff --git a/hbase-protocol/src/main/protobuf/ClusterStatus.proto b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
index 3494908..0234738 100644
--- a/hbase-protocol/src/main/protobuf/ClusterStatus.proto
+++ b/hbase-protocol/src/main/protobuf/ClusterStatus.proto
@@ -153,6 +153,32 @@ message RegionLoad {
optional int32 max_store_file_ref_count = 22 [default = 0];
}
+message UserLoad {
+
+ /** short user name */
+ required string userName = 1;
+
+ /** Metrics for all clients of a user */
+ repeated ClientMetrics clientMetrics = 2;
+
+
+}
+
+message ClientMetrics {
+ /** client host name */
+ required string hostName = 1;
+
+ /** the current total read requests made from a client */
+ optional uint64 read_requests_count = 2;
+
+ /** the current total write requests made from a client */
+ optional uint64 write_requests_count = 3;
+
+ /** the current total filtered requests made from a client */
+ optional uint64 filtered_requests_count = 4;
+
+}
+
/* Server-level protobufs */
message ReplicationLoadSink {
@@ -219,6 +245,11 @@ message ServerLoad {
* The replicationLoadSink for the replication Sink status of this region server.
*/
optional ReplicationLoadSink replLoadSink = 11;
+
+ /**
+ * The metrics for each user on this region server
+ */
+ repeated UserLoad userLoads = 12;
}
message LiveServerInfo {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index a431e5d..d1490c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -843,7 +843,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// TODO: revisit if coprocessors should load in other cases
this.coprocessorHost = new RegionCoprocessorHost(this, rsServices, conf);
this.metricsRegionWrapper = new MetricsRegionWrapperImpl(this);
- this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper);
+ this.metricsRegion = new MetricsRegion(this.metricsRegionWrapper, conf);
} else {
this.metricsRegionWrapper = null;
this.metricsRegion = null;
@@ -6582,6 +6582,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (!outResults.isEmpty()) {
readRequestsCount.increment();
+ if (metricsRegion != null) {
+ metricsRegion.updateReadRequestCount();
+ }
}
if (rsServices != null && rsServices.getMetrics() != null) {
rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable());
@@ -6909,6 +6912,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
protected void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) {
filteredReadRequestsCount.increment();
+ if (metricsRegion != null) {
+ metricsRegion.updateFilteredRecords();
+ }
if (scannerContext == null || !scannerContext.isTrackingMetrics()) return;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 3104982..1c1f0b0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -56,6 +56,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.management.MalformedObjectNameException;
import javax.servlet.http.HttpServlet;
+
import org.apache.commons.lang3.RandomUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.SystemUtils;
@@ -208,6 +209,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Coprocesso
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.UserLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.Coprocessor.Builder;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.NameStringPair;
@@ -1379,7 +1381,14 @@ public class HRegionServer extends HasThread implements
} else {
serverLoad.setInfoServerPort(-1);
}
-
+ MetricsUserAggregateSource userSource =
+ metricsRegionServer.getMetricsUserAggregate().getSource();
+ if (userSource != null) {
+ Map<String, MetricsUserSource> userMetricMap = userSource.getUserSources();
+ for (Entry<String, MetricsUserSource> entry : userMetricMap.entrySet()) {
+ serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
+ }
+ }
// for the replicationLoad purpose. Only need to get from one executorService
// either source or sink will get the same info
ReplicationSourceService rsources = getReplicationSourceService();
@@ -1717,6 +1726,19 @@ public class HRegionServer extends HasThread implements
return regionLoadBldr.build();
}
+ private UserLoad createUserLoad(String user, MetricsUserSource userSource) {
+ UserLoad.Builder userLoadBldr = UserLoad.newBuilder();
+ userLoadBldr.setUserName(user);
+ userSource.getClientMetrics().values().stream().map(
+ clientMetrics -> ClusterStatusProtos.ClientMetrics.newBuilder()
+ .setHostName(clientMetrics.getHostName())
+ .setWriteRequestsCount(clientMetrics.getWriteRequestsCount())
+ .setFilteredRequestsCount(clientMetrics.getFilteredReadRequests())
+ .setReadRequestsCount(clientMetrics.getReadRequestsCount()).build())
+ .forEach(userLoadBldr::addClientMetrics);
+ return userLoadBldr.build();
+ }
+
public RegionLoad createRegionLoad(final String encodedRegionName) throws IOException {
HRegion r = onlineRegions.get(encodedRegionName);
return r != null ? createRegionLoad(r, null, null) : null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
index 868d9b9..a1dad02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegion.java
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@@ -29,12 +30,14 @@ import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@InterfaceAudience.Private
public class MetricsRegion {
private final MetricsRegionSource source;
+ private final MetricsUserAggregate userAggregate;
private MetricsRegionWrapper regionWrapper;
- public MetricsRegion(final MetricsRegionWrapper wrapper) {
+ public MetricsRegion(final MetricsRegionWrapper wrapper, Configuration conf) {
source = CompatibilitySingletonFactory.getInstance(MetricsRegionServerSourceFactory.class)
.createRegion(wrapper);
this.regionWrapper = wrapper;
+ userAggregate = MetricsUserAggregateFactory.getMetricsUserAggregate(conf);
}
public void close() {
@@ -57,6 +60,9 @@ public class MetricsRegion {
source.updateScanTime(t);
}
+ public void updateFilteredRecords(){
+ userAggregate.updateFilteredReadRequests();
+ }
public void updateAppend() {
source.updateAppend();
}
@@ -73,4 +79,7 @@ public class MetricsRegion {
return regionWrapper;
}
+ public void updateReadRequestCount() {
+ userAggregate.updateReadRequestCount();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index 808fc58..98f400e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -101,7 +101,7 @@ public class MetricsRegionServer {
}
@VisibleForTesting
- public org.apache.hadoop.hbase.regionserver.MetricsUserAggregate getMetricsUserAggregate() {
+ public MetricsUserAggregate getMetricsUserAggregate() {
return userAggregate;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
index 9b23ccc..41d6d6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregate.java
@@ -23,6 +23,11 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public interface MetricsUserAggregate {
+ /**
+ * @return return a singleton instance of MetricsUserAggregateSource or null in case of NoOp
+ */
+ MetricsUserAggregateSource getSource();
+
void updatePut(long t);
void updateDelete(long t);
@@ -36,4 +41,8 @@ public interface MetricsUserAggregate {
void updateReplay(long t);
void updateScanTime(long t);
+
+ void updateFilteredReadRequests();
+
+ void updateReadRequestCount();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
index 888c480..38e440b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateFactory.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.regionserver;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.yetus.audience.InterfaceAudience;
@@ -27,6 +26,7 @@ public class MetricsUserAggregateFactory {
private MetricsUserAggregateFactory() {
}
+
public static final String METRIC_USER_ENABLED_CONF = "hbase.regionserver.user.metrics.enabled";
public static final boolean DEFAULT_METRIC_USER_ENABLED_CONF = true;
@@ -36,6 +36,10 @@ public class MetricsUserAggregateFactory {
} else {
//NoOpMetricUserAggregate
return new MetricsUserAggregate() {
+ @Override public MetricsUserAggregateSource getSource() {
+ return null;
+ }
+
@Override public void updatePut(long t) {
}
@@ -63,6 +67,14 @@ public class MetricsUserAggregateFactory {
@Override public void updateScanTime(long t) {
}
+
+ @Override public void updateFilteredReadRequests() {
+
+ }
+
+ @Override public void updateReadRequestCount() {
+
+ }
};
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
index 6c24afc..b457c75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsUserAggregateImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.net.InetAddress;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
@@ -29,8 +30,6 @@ import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.util.LossyCounting;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
@InterfaceAudience.Private
public class MetricsUserAggregateImpl implements MetricsUserAggregate{
@@ -65,8 +64,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
return user.isPresent() ? user.get().getShortName() : null;
}
- @VisibleForTesting
- MetricsUserAggregateSource getSource() {
+ @Override
+ public MetricsUserAggregateSource getSource() {
return source;
}
@@ -74,7 +73,39 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updatePut(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updatePut(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updatePut(t);
+ incrementClientWriteMetrics(userSource);
+ }
+
+ }
+
+ private String getClient() {
+ Optional<InetAddress> ipOptional = RpcServer.getRemoteAddress();
+ if (ipOptional.isPresent()) {
+ return ipOptional.get().getHostName();
+ }
+ return null;
+ }
+
+ private void incrementClientReadMetrics(MetricsUserSource userSource) {
+ String client = getClient();
+ if (client != null && userSource != null) {
+ userSource.getOrCreateMetricsClient(client).incrementReadRequest();
+ }
+ }
+
+ private void incrementFilteredReadRequests(MetricsUserSource userSource) {
+ String client = getClient();
+ if (client != null && userSource != null) {
+ userSource.getOrCreateMetricsClient(client).incrementFilteredReadRequests();
+ }
+ }
+
+ private void incrementClientWriteMetrics(MetricsUserSource userSource) {
+ String client = getClient();
+ if (client != null && userSource != null) {
+ userSource.getOrCreateMetricsClient(client).incrementWriteRequest();
}
}
@@ -82,7 +113,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateDelete(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateDelete(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateDelete(t);
+ incrementClientWriteMetrics(userSource);
}
}
@@ -90,7 +123,8 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateGet(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateGet(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateGet(t);
}
}
@@ -98,7 +132,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateIncrement(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateIncrement(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateIncrement(t);
+ incrementClientWriteMetrics(userSource);
}
}
@@ -106,7 +142,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateAppend(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateAppend(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateAppend(t);
+ incrementClientWriteMetrics(userSource);
}
}
@@ -114,7 +152,9 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateReplay(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateReplay(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateReplay(t);
+ incrementClientWriteMetrics(userSource);
}
}
@@ -122,7 +162,24 @@ public class MetricsUserAggregateImpl implements MetricsUserAggregate{
public void updateScanTime(long t) {
String user = getActiveUser();
if (user != null) {
- getOrCreateMetricsUser(user).updateScanTime(t);
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ userSource.updateScanTime(t);
+ }
+ }
+
+ @Override public void updateFilteredReadRequests() {
+ String user = getActiveUser();
+ if (user != null) {
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ incrementFilteredReadRequests(userSource);
+ }
+ }
+
+ @Override public void updateReadRequestCount() {
+ String user = getActiveUser();
+ if (user != null) {
+ MetricsUserSource userSource = getOrCreateMetricsUser(user);
+ incrementClientReadMetrics(userSource);
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
index e692351..0f4a35b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestClientClusterMetrics.java
@@ -18,28 +18,39 @@
package org.apache.hadoop.hbase;
import java.io.IOException;
+import java.security.PrivilegedAction;
import java.util.EnumSet;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.Waiter.Predicate;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.RegionStatesCount;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.filter.FilterAllFilter;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.MasterThread;
@@ -238,6 +249,137 @@ public class TestClientClusterMetrics {
Assert.assertEquals(MASTERS - 1, metrics.getBackupMasterNames().size());
}
+ @Test public void testUserMetrics() throws Exception {
+ Configuration conf = UTIL.getConfiguration();
+ User userFoo = User.createUserForTesting(conf, "FOO_USER_METRIC_TEST", new String[0]);
+ User userBar = User.createUserForTesting(conf, "BAR_USER_METRIC_TEST", new String[0]);
+ User userTest = User.createUserForTesting(conf, "TEST_USER_METRIC_TEST", new String[0]);
+ UTIL.createTable(TABLE_NAME, CF);
+ waitForUsersMetrics(0);
+ long writeMetaMetricBeforeNextuser = getMetaMetrics().getWriteRequestCount();
+ userFoo.runAs(new PrivilegedAction<Void>() {
+ @Override public Void run() {
+ try {
+ doPut();
+ } catch (IOException e) {
+ Assert.fail("Exception:" + e.getMessage());
+ }
+ return null;
+ }
+ });
+ waitForUsersMetrics(1);
+ long writeMetaMetricForUserFoo =
+ getMetaMetrics().getWriteRequestCount() - writeMetaMetricBeforeNextuser;
+ long readMetaMetricBeforeNextuser = getMetaMetrics().getReadRequestCount();
+ userBar.runAs(new PrivilegedAction<Void>() {
+ @Override public Void run() {
+ try {
+ doGet();
+ } catch (IOException e) {
+ Assert.fail("Exception:" + e.getMessage());
+ }
+ return null;
+ }
+ });
+ waitForUsersMetrics(2);
+ long readMetaMetricForUserBar =
+ getMetaMetrics().getReadRequestCount() - readMetaMetricBeforeNextuser;
+ long filteredMetaReqeust = getMetaMetrics().getFilteredReadRequestCount();
+ userTest.runAs(new PrivilegedAction<Void>() {
+ @Override public Void run() {
+ try {
+ Table table = createConnection(UTIL.getConfiguration()).getTable(TABLE_NAME);
+ for (Result result : table.getScanner(new Scan().setFilter(new FilterAllFilter()))) {
+ Assert.fail("Should have filtered all rows");
+ }
+ } catch (IOException e) {
+ Assert.fail("Exception:" + e.getMessage());
+ }
+ return null;
+ }
+ });
+ waitForUsersMetrics(3);
+ long filteredMetaReqeustForTestUser =
+ getMetaMetrics().getFilteredReadRequestCount() - filteredMetaReqeust;
+ Map<byte[], UserMetrics> userMap =
+ ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
+ .iterator().next().getUserMetrics();
+ for (byte[] user : userMap.keySet()) {
+ switch (Bytes.toString(user)) {
+ case "FOO_USER_METRIC_TEST":
+ Assert.assertEquals(1,
+ userMap.get(user).getWriteRequestCount() - writeMetaMetricForUserFoo);
+ break;
+ case "BAR_USER_METRIC_TEST":
+ Assert
+ .assertEquals(1, userMap.get(user).getReadRequestCount() - readMetaMetricForUserBar);
+ Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
+ break;
+ case "TEST_USER_METRIC_TEST":
+ Assert.assertEquals(1,
+ userMap.get(user).getFilteredReadRequests() - filteredMetaReqeustForTestUser);
+ Assert.assertEquals(0, userMap.get(user).getWriteRequestCount());
+ break;
+ default:
+ //current user
+ Assert.assertEquals(UserProvider.instantiate(conf).getCurrent().getName(),
+ Bytes.toString(user));
+ //Read/write count because of Meta operations
+ Assert.assertTrue(userMap.get(user).getReadRequestCount() > 1);
+ break;
+ }
+ }
+ UTIL.deleteTable(TABLE_NAME);
+ }
+
+ private RegionMetrics getMetaMetrics() throws IOException {
+ for (ServerMetrics serverMetrics : ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS))
+ .getLiveServerMetrics().values()) {
+ RegionMetrics metaMetrics = serverMetrics.getRegionMetrics()
+ .get(RegionInfoBuilder.FIRST_META_REGIONINFO.getRegionName());
+ if (metaMetrics != null) {
+ return metaMetrics;
+ }
+ }
+ Assert.fail("Should have find meta metrics");
+ return null;
+ }
+
+ private void waitForUsersMetrics(int noOfUsers) throws Exception {
+ //Sleep for metrics to get updated on master
+ Thread.sleep(5000);
+ Waiter.waitFor(CLUSTER.getConfiguration(), 10 * 1000, 100, new Predicate<Exception>() {
+ @Override public boolean evaluate() throws Exception {
+ Map<byte[], UserMetrics> metrics =
+ ADMIN.getClusterMetrics(EnumSet.of(Option.LIVE_SERVERS)).getLiveServerMetrics().values()
+ .iterator().next().getUserMetrics();
+ Assert.assertNotNull(metrics);
+ //including current user + noOfUsers
+ return metrics.keySet().size() > noOfUsers;
+ }
+ });
+ }
+
+ private void doPut() throws IOException {
+ try (Connection conn = createConnection(UTIL.getConfiguration())) {
+ Table table = conn.getTable(TABLE_NAME);
+ table
+ .put(new Put(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1"), Bytes.toBytes("1")));
+ }
+ }
+
+ private void doGet() throws IOException {
+ try (Connection conn = createConnection(UTIL.getConfiguration())) {
+ Table table = conn.getTable(TABLE_NAME);
+ table.get(new Get(Bytes.toBytes("a")).addColumn(CF, Bytes.toBytes("col1")));
+ }
+ }
+
+ private Connection createConnection(Configuration conf) throws IOException {
+ User user = UserProvider.instantiate(conf).getCurrent();
+ return ConnectionFactory.createConnection(conf, user);
+ }
+
@Test
public void testOtherStatusInfos() throws Exception {
EnumSet<Option> options =
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
index 2e40bbc..58cb549 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestRegionsRecoveryChore.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.UserMetrics;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.master.assignment.AssignmentManager;
@@ -356,6 +357,10 @@ public class TestRegionsRecoveryChore {
return regionMetricsMap;
}
+ @Override public Map<byte[], UserMetrics> getUserMetrics() {
+ return new HashMap<>();
+ }
+
@Override
public Set<String> getCoprocessorNames() {
return null;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
index 2119b91..b5fbc96 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegion.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompatibilityFactory;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
@@ -38,7 +39,7 @@ public class TestMetricsRegion {
@Test
public void testRegionWrapperMetrics() {
- MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub());
+ MetricsRegion mr = new MetricsRegion(new MetricsRegionWrapperStub(), new Configuration());
MetricsRegionAggregateSource agg = mr.getSource().getAggregateSource();
HELPER.assertGauge(
@@ -72,7 +73,7 @@ public class TestMetricsRegion {
mr.close();
// test region with replica id > 0
- mr = new MetricsRegion(new MetricsRegionWrapperStub(1));
+ mr = new MetricsRegion(new MetricsRegionWrapperStub(1), new Configuration());
agg = mr.getSource().getAggregateSource();
HELPER.assertGauge(
"namespace_TestNS_table_MetricsRegionWrapperStub_region_DEADBEEF001_metric_storeCount",