You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/02/29 19:02:47 UTC
[hbase] branch branch-2 updated: HBASE-22978 : Online slow response
log (#1228)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 6366b73 HBASE-22978 : Online slow response log (#1228)
6366b73 is described below
commit 6366b7313437210e79b47e50b87c6a3cf0e3e4f3
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sun Mar 1 00:32:35 2020 +0530
HBASE-22978 : Online slow response log (#1228)
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
.../java/org/apache/hadoop/hbase/client/Admin.java | 25 +
.../org/apache/hadoop/hbase/client/AsyncAdmin.java | 21 +
.../hadoop/hbase/client/AsyncHBaseAdmin.java | 12 +
.../org/apache/hadoop/hbase/client/HBaseAdmin.java | 66 +++
.../hadoop/hbase/client/RawAsyncHBaseAdmin.java | 63 +++
.../apache/hadoop/hbase/client/SlowLogParams.java | 89 ++++
.../hadoop/hbase/client/SlowLogQueryFilter.java | 122 +++++
.../apache/hadoop/hbase/client/SlowLogRecord.java | 319 +++++++++++
.../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 113 ++++
.../hbase/shaded/protobuf/RequestConverter.java | 44 ++
.../java/org/apache/hadoop/hbase/HConstants.java | 6 +
hbase-common/src/main/resources/hbase-default.xml | 23 +
.../src/main/protobuf/Admin.proto | 27 +
.../src/main/protobuf/TooSlowLog.proto | 45 ++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 75 ++-
.../hadoop/hbase/ipc/RpcServerInterface.java | 17 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 18 +
.../hadoop/hbase/regionserver/RSRpcServices.java | 41 ++
.../slowlog/DisruptorExceptionHandler.java | 50 ++
.../regionserver/slowlog/RingBufferEnvelope.java | 57 ++
.../hbase/regionserver/slowlog/RpcLogDetails.java | 71 +++
.../regionserver/slowlog/SlowLogEventHandler.java | 208 ++++++++
.../regionserver/slowlog/SlowLogRecorder.java | 153 ++++++
.../org/apache/hadoop/hbase/client/TestAdmin2.java | 27 +
.../apache/hadoop/hbase/client/TestAdminBase.java | 1 +
.../apache/hadoop/hbase/ipc/AbstractTestIPC.java | 6 +-
.../hadoop/hbase/master/MockRegionServer.java | 16 +
.../regionserver/slowlog/TestSlowLogRecorder.java | 593 +++++++++++++++++++++
hbase-shell/src/main/ruby/hbase/admin.rb | 84 +++
hbase-shell/src/main/ruby/shell.rb | 2 +
.../ruby/shell/commands/clear_slowlog_responses.rb | 47 ++
.../ruby/shell/commands/get_slowlog_responses.rb | 78 +++
hbase-shell/src/test/ruby/hbase/admin_test.rb | 14 +
.../hadoop/hbase/thrift2/client/ThriftAdmin.java | 13 +
src/main/asciidoc/_chapters/hbase-default.adoc | 38 ++
src/main/asciidoc/_chapters/ops_mgt.adoc | 114 ++++
36 files changed, 2677 insertions(+), 21 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index a8029f8..4576d7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -3134,4 +3134,29 @@ public interface Admin extends Abortable, Closeable {
*/
boolean isSnapshotCleanupEnabled() throws IOException;
+
+ /**
+ * Retrieves online slow RPC logs from the provided list of
+ * RegionServers
+ *
+ * @param serverNames Server names to get slowlog responses from
+ * @param slowLogQueryFilter filter to be used if provided
+ * @return online slowlog response list
+ * @throws IOException if a remote or network exception occurs
+ */
+ List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
+ final SlowLogQueryFilter slowLogQueryFilter) throws IOException;
+
+ /**
+ * Clears online slow RPC logs from the provided list of
+ * RegionServers
+ *
+ * @param serverNames Set of Server names to clean slowlog responses from
+ * @return List of booleans representing if online slowlog response buffer is cleaned
+ * from each RegionServer
+ * @throws IOException if a remote or network exception occurs
+ */
+ List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames)
+ throws IOException;
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 1aa7623..9e61a38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -1484,4 +1484,25 @@ public interface AsyncAdmin {
*/
CompletableFuture<Boolean> isSnapshotCleanupEnabled();
+ /**
+ * Retrieves online slow RPC logs from the provided list of
+ * RegionServers
+ *
+ * @param serverNames Server names to get slowlog responses from
+ * @param slowLogQueryFilter filter to be used if provided
+ * @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(final Set<ServerName> serverNames,
+ final SlowLogQueryFilter slowLogQueryFilter);
+
+ /**
+ * Clears online slow RPC logs from the provided list of
+ * RegionServers
+ *
+ * @param serverNames Set of Server names to clean slowlog responses from
+ * @return List of booleans representing if online slowlog response buffer is cleaned
+ * from each RegionServer. The return value wrapped by a {@link CompletableFuture}
+ */
+ CompletableFuture<List<Boolean>> clearSlowLogResponses(final Set<ServerName> serverNames);
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 3b51279..d5c9d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.client;
import com.google.protobuf.RpcChannel;
+
import java.util.EnumSet;
import java.util.List;
import java.util.Map;
@@ -836,4 +837,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
return wrap(rawAdmin.isSnapshotCleanupEnabled());
}
+ @Override
+ public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
+ final Set<ServerName> serverNames, final SlowLogQueryFilter slowLogQueryFilter) {
+ return wrap(rawAdmin.getSlowLogResponses(serverNames, slowLogQueryFilter));
+ }
+
+ @Override
+ public CompletableFuture<List<Boolean>> clearSlowLogResponses(Set<ServerName> serverNames) {
+ return wrap(rawAdmin.clearSlowLogResponses(serverNames));
+ }
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 0b2be19..7ef5d4d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import com.google.protobuf.RpcController;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
import java.io.InterruptedIOException;
@@ -111,6 +112,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -4358,4 +4360,68 @@ public class HBaseAdmin implements Admin {
}
+ @Override
+ public List<SlowLogRecord> getSlowLogResponses(@Nullable final Set<ServerName> serverNames,
+ final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+ if (CollectionUtils.isEmpty(serverNames)) {
+ return Collections.emptyList();
+ }
+ return serverNames.stream().map(serverName -> {
+ try {
+ return getSlowLogResponseFromServer(serverName, slowLogQueryFilter);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ ).flatMap(List::stream).collect(Collectors.toList());
+ }
+
+ private List<SlowLogRecord> getSlowLogResponseFromServer(final ServerName serverName,
+ final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+ return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), slowLogQueryFilter);
+ }
+
+ private List<SlowLogRecord> getSlowLogResponsesFromServer(AdminService.BlockingInterface admin,
+ SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+ return executeCallable(new RpcRetryingCallable<List<SlowLogRecord>>() {
+ @Override
+ protected List<SlowLogRecord> rpcCall(int callTimeout) throws Exception {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ AdminProtos.SlowLogResponses slowLogResponses =
+ admin.getSlowLogResponses(controller,
+ RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter));
+ return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
+ }
+ });
+ }
+
+ @Override
+ public List<Boolean> clearSlowLogResponses(@Nullable final Set<ServerName> serverNames)
+ throws IOException {
+ if (CollectionUtils.isEmpty(serverNames)) {
+ return Collections.emptyList();
+ }
+ return serverNames.stream().map(serverName -> {
+ try {
+ return clearSlowLogsResponses(serverName);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }).collect(Collectors.toList());
+ }
+
+ private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
+ AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+ return executeCallable(new RpcRetryingCallable<Boolean>() {
+ @Override
+ protected Boolean rpcCall(int callTimeout) throws Exception {
+ HBaseRpcController controller = rpcControllerFactory.newController();
+ AdminProtos.ClearSlowLogResponses clearSlowLogResponses =
+ admin.clearSlowLogsResponses(controller,
+ RequestConverter.buildClearSlowLogResponseRequest());
+ return ProtobufUtil.toClearSlowLogPayload(clearSlowLogResponses);
+ }
+ });
+ }
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index ee32e42..ddaf786 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
import com.google.protobuf.Message;
import com.google.protobuf.RpcChannel;
+import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
@@ -45,6 +46,7 @@ import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -102,6 +104,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
@@ -3884,4 +3888,63 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
.call();
}
+ @Override
+ public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
+ @Nullable final Set<ServerName> serverNames,
+ final SlowLogQueryFilter slowLogQueryFilter) {
+ if (CollectionUtils.isEmpty(serverNames)) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ return CompletableFuture.supplyAsync(() -> serverNames.stream()
+ .map((ServerName serverName) ->
+ getSlowLogResponseFromServer(serverName, slowLogQueryFilter))
+ .map(CompletableFuture::join)
+ .flatMap(List::stream)
+ .collect(Collectors.toList()));
+ }
+
+ private CompletableFuture<List<SlowLogRecord>> getSlowLogResponseFromServer(
+ final ServerName serverName, final SlowLogQueryFilter slowLogQueryFilter) {
+ return this.<List<SlowLogRecord>>newAdminCaller()
+ .action((controller, stub) -> this
+ .adminCall(
+ controller, stub, RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter),
+ AdminService.Interface::getSlowLogResponses,
+ ProtobufUtil::toSlowLogPayloads))
+ .serverName(serverName).call();
+ }
+
+ @Override
+ public CompletableFuture<List<Boolean>> clearSlowLogResponses(
+ @Nullable Set<ServerName> serverNames) {
+ if (CollectionUtils.isEmpty(serverNames)) {
+ return CompletableFuture.completedFuture(Collections.emptyList());
+ }
+ List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
+ .map(this::clearSlowLogsResponses)
+ .collect(Collectors.toList());
+ return convertToFutureOfList(clearSlowLogResponseList);
+ }
+
+ private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
+ return this.<Boolean>newAdminCaller()
+ .action(((controller, stub) -> this
+ .adminCall(
+ controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
+ AdminService.Interface::clearSlowLogsResponses,
+ ProtobufUtil::toClearSlowLogPayload))
+ ).serverName(serverName).call();
+ }
+
+ private static <T> CompletableFuture<List<T>> convertToFutureOfList(
+ List<CompletableFuture<T>> futures) {
+ CompletableFuture<Void> allDoneFuture =
+ CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+ return allDoneFuture.thenApply(v ->
+ futures.stream()
+ .map(CompletableFuture::join)
+ .collect(Collectors.toList())
+ );
+ }
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java
new file mode 100644
index 0000000..86df9fd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * SlowLog params object that contains detailed info as params and region name : to be used
+ * for filter purpose
+ */
+@InterfaceAudience.Private
+public class SlowLogParams {
+
+ private final String regionName;
+ private final String params;
+
+ public SlowLogParams(String regionName, String params) {
+ this.regionName = regionName;
+ this.params = params;
+ }
+
+ public SlowLogParams(String params) {
+ this.regionName = StringUtils.EMPTY;
+ this.params = params;
+ }
+
+ public String getRegionName() {
+ return regionName;
+ }
+
+ public String getParams() {
+ return params;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("regionName", regionName)
+ .append("params", params)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlowLogParams that = (SlowLogParams) o;
+
+ return new EqualsBuilder()
+ .append(regionName, that.regionName)
+ .append(params, that.params)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(regionName)
+ .append(params)
+ .toHashCode();
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java
new file mode 100644
index 0000000..aa56a8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * SlowLog Query Filter with all filter and limit parameters
+ */
+@InterfaceAudience.Private
+public class SlowLogQueryFilter {
+
+ private String regionName;
+ private String clientAddress;
+ private String tableName;
+ private String userName;
+ private int limit = 10;
+
+ public String getRegionName() {
+ return regionName;
+ }
+
+ public void setRegionName(String regionName) {
+ this.regionName = regionName;
+ }
+
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ public void setClientAddress(String clientAddress) {
+ this.clientAddress = clientAddress;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlowLogQueryFilter that = (SlowLogQueryFilter) o;
+
+ return new EqualsBuilder()
+ .append(limit, that.limit)
+ .append(regionName, that.regionName)
+ .append(clientAddress, that.clientAddress)
+ .append(tableName, that.tableName)
+ .append(userName, that.userName)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(regionName)
+ .append(clientAddress)
+ .append(tableName)
+ .append(userName)
+ .append(limit)
+ .toHashCode();
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("regionName", regionName)
+ .append("clientAddress", clientAddress)
+ .append("tableName", tableName)
+ .append("userName", userName)
+ .append("limit", limit)
+ .toString();
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java
new file mode 100644
index 0000000..9593618
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java
@@ -0,0 +1,319 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
+
+/**
+ * SlowLog payload for hbase-client, to be used by Admin API get_slow_responses
+ */
+@InterfaceAudience.Private
+final public class SlowLogRecord {
+
+ // used to convert object to pretty printed format
+ // used by toJsonPrettyPrint()
+ private static final Gson GSON = GsonUtil.createGson()
+ .setPrettyPrinting()
+ .registerTypeAdapter(SlowLogRecord.class, (JsonSerializer<SlowLogRecord>)
+ (slowLogPayload, type, jsonSerializationContext) -> {
+ Gson gson = new Gson();
+ JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload);
+ if (slowLogPayload.getMultiGetsCount() == 0) {
+ jsonObj.remove("multiGetsCount");
+ }
+ if (slowLogPayload.getMultiMutationsCount() == 0) {
+ jsonObj.remove("multiMutationsCount");
+ }
+ if (slowLogPayload.getMultiServiceCalls() == 0) {
+ jsonObj.remove("multiServiceCalls");
+ }
+ return jsonObj;
+ }).create();
+
+ private long startTime;
+ private int processingTime;
+ private int queueTime;
+ private long responseSize;
+ private String clientAddress;
+ private String serverClass;
+ private String methodName;
+ private String callDetails;
+ private String param;
+ // we don't want to serialize region name, it is just for the filter purpose
+ // hence avoiding deserialization
+ private transient String regionName;
+ private String userName;
+ private int multiGetsCount;
+ private int multiMutationsCount;
+ private int multiServiceCalls;
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public int getProcessingTime() {
+ return processingTime;
+ }
+
+ public int getQueueTime() {
+ return queueTime;
+ }
+
+ public long getResponseSize() {
+ return responseSize;
+ }
+
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ public String getServerClass() {
+ return serverClass;
+ }
+
+ public String getMethodName() {
+ return methodName;
+ }
+
+ public String getCallDetails() {
+ return callDetails;
+ }
+
+ public String getParam() {
+ return param;
+ }
+
+ public String getRegionName() {
+ return regionName;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public int getMultiGetsCount() {
+ return multiGetsCount;
+ }
+
+ public int getMultiMutationsCount() {
+ return multiMutationsCount;
+ }
+
+ public int getMultiServiceCalls() {
+ return multiServiceCalls;
+ }
+
+ private SlowLogRecord(final long startTime, final int processingTime, final int queueTime,
+ final long responseSize, final String clientAddress, final String serverClass,
+ final String methodName, final String callDetails, final String param,
+ final String regionName, final String userName, final int multiGetsCount,
+ final int multiMutationsCount, final int multiServiceCalls) {
+ this.startTime = startTime;
+ this.processingTime = processingTime;
+ this.queueTime = queueTime;
+ this.responseSize = responseSize;
+ this.clientAddress = clientAddress;
+ this.serverClass = serverClass;
+ this.methodName = methodName;
+ this.callDetails = callDetails;
+ this.param = param;
+ this.regionName = regionName;
+ this.userName = userName;
+ this.multiGetsCount = multiGetsCount;
+ this.multiMutationsCount = multiMutationsCount;
+ this.multiServiceCalls = multiServiceCalls;
+ }
+
+ public static class SlowLogRecordBuilder {
+ private long startTime;
+ private int processingTime;
+ private int queueTime;
+ private long responseSize;
+ private String clientAddress;
+ private String serverClass;
+ private String methodName;
+ private String callDetails;
+ private String param;
+ private String regionName;
+ private String userName;
+ private int multiGetsCount;
+ private int multiMutationsCount;
+ private int multiServiceCalls;
+
+ public SlowLogRecordBuilder setStartTime(long startTime) {
+ this.startTime = startTime;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setProcessingTime(int processingTime) {
+ this.processingTime = processingTime;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setQueueTime(int queueTime) {
+ this.queueTime = queueTime;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setResponseSize(long responseSize) {
+ this.responseSize = responseSize;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setClientAddress(String clientAddress) {
+ this.clientAddress = clientAddress;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setServerClass(String serverClass) {
+ this.serverClass = serverClass;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setMethodName(String methodName) {
+ this.methodName = methodName;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setCallDetails(String callDetails) {
+ this.callDetails = callDetails;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setParam(String param) {
+ this.param = param;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setRegionName(String regionName) {
+ this.regionName = regionName;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setUserName(String userName) {
+ this.userName = userName;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setMultiGetsCount(int multiGetsCount) {
+ this.multiGetsCount = multiGetsCount;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setMultiMutationsCount(int multiMutationsCount) {
+ this.multiMutationsCount = multiMutationsCount;
+ return this;
+ }
+
+ public SlowLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) {
+ this.multiServiceCalls = multiServiceCalls;
+ return this;
+ }
+
+ public SlowLogRecord build() {
+ return new SlowLogRecord(startTime, processingTime, queueTime, responseSize,
+ clientAddress, serverClass, methodName, callDetails, param, regionName,
+ userName, multiGetsCount, multiMutationsCount, multiServiceCalls);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ SlowLogRecord that = (SlowLogRecord) o;
+
+ return new EqualsBuilder()
+ .append(startTime, that.startTime)
+ .append(processingTime, that.processingTime)
+ .append(queueTime, that.queueTime)
+ .append(responseSize, that.responseSize)
+ .append(multiGetsCount, that.multiGetsCount)
+ .append(multiMutationsCount, that.multiMutationsCount)
+ .append(multiServiceCalls, that.multiServiceCalls)
+ .append(clientAddress, that.clientAddress)
+ .append(serverClass, that.serverClass)
+ .append(methodName, that.methodName)
+ .append(callDetails, that.callDetails)
+ .append(param, that.param)
+ .append(regionName, that.regionName)
+ .append(userName, that.userName)
+ .isEquals();
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder(17, 37)
+ .append(startTime)
+ .append(processingTime)
+ .append(queueTime)
+ .append(responseSize)
+ .append(clientAddress)
+ .append(serverClass)
+ .append(methodName)
+ .append(callDetails)
+ .append(param)
+ .append(regionName)
+ .append(userName)
+ .append(multiGetsCount)
+ .append(multiMutationsCount)
+ .append(multiServiceCalls)
+ .toHashCode();
+ }
+
+ public String toJsonPrettyPrint() {
+ return GSON.toJson(this);
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("startTime", startTime)
+ .append("processingTime", processingTime)
+ .append("queueTime", queueTime)
+ .append("responseSize", responseSize)
+ .append("clientAddress", clientAddress)
+ .append("serverClass", serverClass)
+ .append("methodName", methodName)
+ .append("callDetails", callDetails)
+ .append("param", param)
+ .append("regionName", regionName)
+ .append("userName", userName)
+ .append("multiGetsCount", multiGetsCount)
+ .append("multiMutationsCount", multiMutationsCount)
+ .append("multiServiceCalls", multiServiceCalls)
+ .toString();
+ }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 46cef7b..dc4091b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.RegionStatesCount;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.client.SlowLogRecord;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -128,7 +130,10 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
@@ -145,12 +150,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegio
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@@ -182,6 +191,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -2222,6 +2232,57 @@ public final class ProtobufUtil {
}
/**
+ * Return SlowLogParams to maintain recent online slowlog responses
+ *
+ * @param message Message object {@link Message}
+ * @return SlowLogParams with regionName(for filter queries) and params
+ */
+ public static SlowLogParams getSlowLogParams(Message message) {
+ if (message == null) {
+ return null;
+ }
+ if (message instanceof ScanRequest) {
+ ScanRequest scanRequest = (ScanRequest) message;
+ String regionName = getStringForByteString(scanRequest.getRegion().getValue());
+ String params = TextFormat.shortDebugString(message);
+ return new SlowLogParams(regionName, params);
+ } else if (message instanceof MutationProto) {
+ MutationProto mutationProto = (MutationProto) message;
+ String params = "type= " + mutationProto.getMutateType().toString();
+ return new SlowLogParams(params);
+ } else if (message instanceof GetRequest) {
+ GetRequest getRequest = (GetRequest) message;
+ String regionName = getStringForByteString(getRequest.getRegion().getValue());
+ String params = "region= " + regionName + ", row= "
+ + getStringForByteString(getRequest.getGet().getRow());
+ return new SlowLogParams(regionName, params);
+ } else if (message instanceof MultiRequest) {
+ MultiRequest multiRequest = (MultiRequest) message;
+ int actionsCount = multiRequest.getRegionActionList()
+ .stream()
+ .mapToInt(ClientProtos.RegionAction::getActionCount)
+ .sum();
+ RegionAction actions = multiRequest.getRegionActionList().get(0);
+ String regionName = getStringForByteString(actions.getRegion().getValue());
+ String params = "region= " + regionName + ", for " + actionsCount + " action(s)";
+ return new SlowLogParams(regionName, params);
+ } else if (message instanceof MutateRequest) {
+ MutateRequest mutateRequest = (MutateRequest) message;
+ String regionName = getStringForByteString(mutateRequest.getRegion().getValue());
+ String params = "region= " + regionName;
+ return new SlowLogParams(regionName, params);
+ } else if (message instanceof CoprocessorServiceRequest) {
+ CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message;
+ String params = "coprocessorService= "
+ + coprocessorServiceRequest.getCall().getServiceName()
+ + ":" + coprocessorServiceRequest.getCall().getMethodName();
+ return new SlowLogParams(params);
+ }
+ String params = message.getClass().toString();
+ return new SlowLogParams(params);
+ }
+
+ /**
* Print out some subset of a MutationProto rather than all of it and its data
* @param proto Protobuf to print out
* @return Short String of mutation proto
@@ -3414,4 +3475,56 @@ public final class ProtobufUtil {
.build();
}
+ /**
+ * Convert Protobuf class
+ * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload}
+ * To client SlowLog Payload class {@link SlowLogRecord}
+ *
+ * @param slowLogPayload SlowLog Payload protobuf instance
+ * @return SlowLog Payload for client usecase
+ */
+ private static SlowLogRecord getSlowLogRecord(
+ final TooSlowLog.SlowLogPayload slowLogPayload) {
+ SlowLogRecord clientSlowLogRecord = new SlowLogRecord.SlowLogRecordBuilder()
+ .setCallDetails(slowLogPayload.getCallDetails())
+ .setClientAddress(slowLogPayload.getClientAddress())
+ .setMethodName(slowLogPayload.getMethodName())
+ .setMultiGetsCount(slowLogPayload.getMultiGets())
+ .setMultiMutationsCount(slowLogPayload.getMultiMutations())
+ .setMultiServiceCalls(slowLogPayload.getMultiServiceCalls())
+ .setParam(slowLogPayload.getParam())
+ .setProcessingTime(slowLogPayload.getProcessingTime())
+ .setQueueTime(slowLogPayload.getQueueTime())
+ .setRegionName(slowLogPayload.getRegionName())
+ .setResponseSize(slowLogPayload.getResponseSize())
+ .setServerClass(slowLogPayload.getServerClass())
+ .setStartTime(slowLogPayload.getStartTime())
+ .setUserName(slowLogPayload.getUserName())
+ .build();
+ return clientSlowLogRecord;
+ }
+
+ /**
+ * Convert AdminProtos#SlowLogResponses to list of {@link SlowLogRecord}
+ *
+ * @param slowLogResponses slowlog response protobuf instance
+ * @return list of SlowLog payloads for client usecase
+ */
+ public static List<SlowLogRecord> toSlowLogPayloads(
+ final AdminProtos.SlowLogResponses slowLogResponses) {
+ List<SlowLogRecord> slowLogRecords = slowLogResponses.getSlowLogPayloadsList()
+ .stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
+ return slowLogRecords;
+ }
+
+ /**
+ * Convert {@link ClearSlowLogResponses} to boolean
+ *
+ * @param clearSlowLogResponses Clear slowlog response protobuf instance
+ * @return boolean representing clear slowlog response
+ */
+ public static boolean toClearSlowLogPayload(final ClearSlowLogResponses clearSlowLogResponses) {
+ return clearSlowLogResponses.getIsCleaned();
+ }
+
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 24f8009..afdd653 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -27,6 +27,7 @@ import java.util.Set;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.CellScannable;
import org.apache.hadoop.hbase.ClusterMetrics.Option;
import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Row;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@@ -67,6 +69,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
@@ -1940,4 +1944,44 @@ public final class RequestConverter {
return IsSnapshotCleanupEnabledRequest.newBuilder().build();
}
+ /**
+ * Create a protocol buffer {@link SlowLogResponseRequest}
+ *
+ * @param slowLogQueryFilter filter to use if provided
+ * @return a protocol buffer SlowLogResponseRequest
+ */
+ public static SlowLogResponseRequest buildSlowLogResponseRequest(
+ final SlowLogQueryFilter slowLogQueryFilter) {
+ SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder();
+ if (slowLogQueryFilter == null) {
+ return builder.build();
+ }
+ final String clientAddress = slowLogQueryFilter.getClientAddress();
+ if (StringUtils.isNotEmpty(clientAddress)) {
+ builder.setClientAddress(clientAddress);
+ }
+ final String regionName = slowLogQueryFilter.getRegionName();
+ if (StringUtils.isNotEmpty(regionName)) {
+ builder.setRegionName(regionName);
+ }
+ final String tableName = slowLogQueryFilter.getTableName();
+ if (StringUtils.isNotEmpty(tableName)) {
+ builder.setTableName(tableName);
+ }
+ final String userName = slowLogQueryFilter.getUserName();
+ if (StringUtils.isNotEmpty(userName)) {
+ builder.setUserName(userName);
+ }
+ return builder.setLimit(slowLogQueryFilter.getLimit()).build();
+ }
+
+ /**
+ * Create a protocol buffer {@link ClearSlowLogResponseRequest}
+ *
+ * @return a protocol buffer ClearSlowLogResponseRequest
+ */
+ public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() {
+ return ClearSlowLogResponseRequest.newBuilder().build();
+ }
+
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index d6b9667..6f1bb6a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1566,6 +1566,12 @@ public final class HConstants {
"hbase.master.executor.logreplayops.threads";
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
+ public static final int DEFAULT_SLOW_LOG_RING_BUFFER_SIZE = 256;
+
+ public static final String SLOW_LOG_BUFFER_ENABLED_KEY =
+ "hbase.regionserver.slowlog.buffer.enabled";
+ public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 8fda74d..a7d6898 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1926,4 +1926,27 @@ possible configurations would overwhelm and obscure the important.
enable this feature.
</description>
</property>
+ <property>
+ <name>hbase.regionserver.slowlog.ringbuffer.size</name>
+ <value>256</value>
+ <description>
+ Default size of ringbuffer to be maintained by each RegionServer in order
+ to store online slowlog responses. This is an in-memory ring buffer of
+ requests that were judged to be too slow in addition to the responseTooSlow
+ logging. The in-memory representation would be complete.
+ For more details, please look into Doc Section:
+ Get Slow Response Log from shell
+ </description>
+ </property>
+ <property>
+ <name>hbase.regionserver.slowlog.buffer.enabled</name>
+ <value>false</value>
+ <description>
+ Indicates whether RegionServers have ring buffer running for storing
+ Online Slow logs in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
+ The default value is false, turn this on and get latest slowlog
+ responses with complete data.
+ </description>
+ </property>
</configuration>
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index 85b9113..34c9806 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -29,6 +29,7 @@ import "ClusterStatus.proto";
import "HBase.proto";
import "WAL.proto";
import "Quota.proto";
+import "TooSlowLog.proto";
message GetRegionInfoRequest {
required RegionSpecifier region = 1;
@@ -280,6 +281,26 @@ message ExecuteProceduresRequest {
message ExecuteProceduresResponse {
}
+message SlowLogResponseRequest {
+ optional string region_name = 1;
+ optional string table_name = 2;
+ optional string client_address = 3;
+ optional string user_name = 4;
+ optional uint32 limit = 5 [default = 10];
+}
+
+message SlowLogResponses {
+ repeated SlowLogPayload slow_log_payloads = 1;
+}
+
+message ClearSlowLogResponseRequest {
+
+}
+
+message ClearSlowLogResponses {
+ required bool is_cleaned = 1;
+}
+
service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
@@ -344,4 +365,10 @@ service AdminService {
rpc ExecuteProcedures(ExecuteProceduresRequest)
returns(ExecuteProceduresResponse);
+
+ rpc GetSlowLogResponses(SlowLogResponseRequest)
+ returns(SlowLogResponses);
+
+ rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
+ returns(ClearSlowLogResponses);
}
diff --git a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto
new file mode 100644
index 0000000..26dabde
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+// This file contains protocol buffers that are used for Online TooSlowLogs
+// To be used as Ring Buffer payload
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "TooSlowLog";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message SlowLogPayload {
+ required int64 start_time = 1;
+ required int32 processing_time = 2;
+ required int32 queue_time = 3;
+ required int64 response_size = 4;
+ required string client_address = 5;
+ required string server_class = 6;
+ required string method_name = 7;
+ required string call_details = 8;
+ optional string param = 9;
+ required string user_name = 10;
+ optional string region_name = 11;
+ optional int32 multi_gets = 12 [default = 0];
+ optional int32 multi_mutations = 13 [default = 0];
+ optional int32 multi_service_calls = 14 [default = 0];
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f0409fd..97b4990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -33,6 +33,8 @@ import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
@@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
import org.apache.hadoop.hbase.security.User;
@@ -85,6 +89,10 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
= new CallQueueTooBigException();
+ private static final String MULTI_GETS = "multi.gets";
+ private static final String MULTI_MUTATIONS = "multi.mutations";
+ private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
+
private final boolean authorize;
protected boolean isSecurityEnabled;
@@ -215,6 +223,12 @@ public abstract class RpcServer implements RpcServerInterface,
*/
private RSRpcServices rsRpcServices;
+
+ /**
+ * Use to add online slowlog responses
+ */
+ private SlowLogRecorder slowLogRecorder;
+
@FunctionalInterface
protected interface CallCleanup {
void run();
@@ -403,13 +417,21 @@ public abstract class RpcServer implements RpcServerInterface,
boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
if (tooSlow || tooLarge) {
+ final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
// when tagging, we let TooLarge trump TooSmall to keep output simple
// note that large responses will often also be slow.
logResponse(param,
- md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
- (tooLarge ? "TooLarge" : "TooSlow"),
- status.getClient(), startTime, processingTime, qTime,
- responseSize);
+ md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
+ tooLarge, tooSlow,
+ status.getClient(), startTime, processingTime, qTime,
+ responseSize, userName);
+ if (tooSlow && this.slowLogRecorder != null) {
+ // send logs to ring buffer owned by slowLogRecorder
+ final String className = server == null ? StringUtils.EMPTY :
+ server.getClass().getSimpleName();
+ this.slowLogRecorder.addSlowLogPayload(
+ new RpcLogDetails(call, status.getClient(), responseSize, className));
+ }
}
return new Pair<>(result, controller.cellScanner());
} catch (Throwable e) {
@@ -440,17 +462,21 @@ public abstract class RpcServer implements RpcServerInterface,
* @param param The parameters received in the call.
* @param methodName The name of the method invoked
* @param call The string representation of the call
- * @param tag The tag that will be used to indicate this event in the log.
- * @param clientAddress The address of the client who made this call.
- * @param startTime The time that the call was initiated, in ms.
- * @param processingTime The duration that the call took to run, in ms.
- * @param qTime The duration that the call spent on the queue
- * prior to being initiated, in ms.
- * @param responseSize The size in bytes of the response buffer.
+ * @param tooLarge To indicate if the event is tooLarge
+ * @param tooSlow To indicate if the event is tooSlow
+ * @param clientAddress The address of the client who made this call.
+ * @param startTime The time that the call was initiated, in ms.
+ * @param processingTime The duration that the call took to run, in ms.
+ * @param qTime The duration that the call spent on the queue
+ * prior to being initiated, in ms.
+ * @param responseSize The size in bytes of the response buffer.
+ * @param userName UserName of the current RPC Call
*/
- void logResponse(Message param, String methodName, String call, String tag,
- String clientAddress, long startTime, int processingTime, int qTime,
- long responseSize) throws IOException {
+ void logResponse(Message param, String methodName, String call, boolean tooLarge,
+ boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
+ long responseSize, String userName) {
+ final String className = server == null ? StringUtils.EMPTY :
+ server.getClass().getSimpleName();
// base information that is reported regardless of type of call
Map<String, Object> responseInfo = new HashMap<>();
responseInfo.put("starttimems", startTime);
@@ -458,7 +484,7 @@ public abstract class RpcServer implements RpcServerInterface,
responseInfo.put("queuetimems", qTime);
responseInfo.put("responsesize", responseSize);
responseInfo.put("client", clientAddress);
- responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
+ responseInfo.put("class", className);
responseInfo.put("method", methodName);
responseInfo.put("call", call);
// The params could be really big, make sure they don't kill us at WARN
@@ -496,13 +522,16 @@ public abstract class RpcServer implements RpcServerInterface,
}
}
}
- responseInfo.put("multi.gets", numGets);
- responseInfo.put("multi.mutations", numMutations);
- responseInfo.put("multi.servicecalls", numServiceCalls);
+ responseInfo.put(MULTI_GETS, numGets);
+ responseInfo.put(MULTI_MUTATIONS, numMutations);
+ responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
}
+ final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
+ : (tooSlow ? "TooSlow" : "TooLarge");
LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
}
+
/**
* Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
* if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
@@ -758,4 +787,14 @@ public abstract class RpcServer implements RpcServerInterface,
public void setRsRpcServices(RSRpcServices rsRpcServices) {
this.rsRpcServices = rsRpcServices;
}
+
+ @Override
+ public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
+ this.slowLogRecorder = slowLogRecorder;
+ }
+
+ @Override
+ public SlowLogRecorder getSlowLogRecorder() {
+ return slowLogRecorder;
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 0f875d8..3155679 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -22,13 +22,14 @@ package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import java.net.InetSocketAddress;
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -96,4 +97,16 @@ public interface RpcServerInterface {
ByteBuffAllocator getByteBuffAllocator();
void setRsRpcServices(RSRpcServices rsRpcServices);
+
+ /**
+ * Set Online SlowLog Provider
+ *
+ * @param slowLogRecorder instance of {@link SlowLogRecorder}
+ */
+ void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
+
+ /**
+ * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
+ */
+ SlowLogRecorder getSlowLogRecorder();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index df81413..75122c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -528,6 +529,11 @@ public class HRegionServer extends HasThread implements
private final NettyEventLoopGroupConfig eventLoopGroupConfig;
/**
+ * Provide online slow log responses from ringbuffer
+ */
+ private SlowLogRecorder slowLogRecorder;
+
+ /**
* True if this RegionServer is coming up in a cluster where there is no Master;
* means it needs to just come up and make do without a Master to talk to: e.g. in test or
* HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
@@ -586,6 +592,9 @@ public class HRegionServer extends HasThread implements
this.abortRequested = false;
this.stopped = false;
+ if (!(this instanceof HMaster)) {
+ this.slowLogRecorder = new SlowLogRecorder(this.conf);
+ }
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
String hostName =
@@ -1494,6 +1503,15 @@ public class HRegionServer extends HasThread implements
}
/**
+ * get Online SlowLog Provider to add slow logs to ringbuffer
+ *
+ * @return Online SlowLog Provider
+ */
+ public SlowLogRecorder getSlowLogRecorder() {
+ return this.slowLogRecorder;
+ }
+
+ /*
* Run init. Sets up wal and starts up all server threads.
*
* @param c Extra configuration.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b90c6fc..ed4ead0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -34,6 +34,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -125,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -167,6 +170,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -196,6 +201,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@@ -243,6 +250,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -1245,6 +1253,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
+ if (!(rs instanceof HMaster)) {
+ rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
+ }
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
@@ -3730,6 +3741,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
}
}
+ @Override
+ @QosPriority(priority = HConstants.ADMIN_QOS)
+ public SlowLogResponses getSlowLogResponses(final RpcController controller,
+ final SlowLogResponseRequest request) {
+ final SlowLogRecorder slowLogRecorder =
+ this.regionServer.getSlowLogRecorder();
+ final List<SlowLogPayload> slowLogPayloads;
+ slowLogPayloads = slowLogRecorder != null
+ ? slowLogRecorder.getSlowLogPayloads(request)
+ : Collections.emptyList();
+ SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
+ .addAllSlowLogPayloads(slowLogPayloads)
+ .build();
+ return slowLogResponses;
+ }
+
+ @Override
+ @QosPriority(priority = HConstants.ADMIN_QOS)
+ public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
+ final ClearSlowLogResponseRequest request) {
+ final SlowLogRecorder slowLogRecorder =
+ this.regionServer.getSlowLogRecorder();
+ boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
+ .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
+ ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
+ .setIsCleaned(slowLogsCleaned)
+ .build();
+ return clearSlowLogResponses;
+ }
+
@VisibleForTesting
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
new file mode 100644
index 0000000..53a2ef1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.ExceptionHandler;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Exception Handler for Online Slow Log Ring Buffer
+ */
+@InterfaceAudience.Private
+class DisruptorExceptionHandler implements ExceptionHandler<RingBufferEnvelope> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
+
+ @Override
+ public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) {
+ LOG.error("Sequence={}, event={}", sequence, event, e);
+ }
+
+ @Override
+ public void handleOnStartException(Throwable e) {
+ LOG.error("Disruptor onStartException: ", e);
+ }
+
+ @Override
+ public void handleOnShutdownException(Throwable e) {
+ LOG.error("Disruptor onShutdownException: ", e);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
new file mode 100644
index 0000000..d308670
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * An envelope to carry payload in the slow log ring buffer that serves as online buffer
+ * to provide latest TooSlowLog
+ */
+@InterfaceAudience.Private
+final class RingBufferEnvelope {
+
+ private RpcLogDetails rpcLogDetails;
+
+ /**
+ * Load the Envelope with {@link RpcCall}
+ *
+ * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
+ * consumers
+ */
+ public void load(RpcLogDetails rpcLogDetails) {
+ this.rpcLogDetails = rpcLogDetails;
+ }
+
+ /**
+ * Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and
+ * free up the Envelope
+ *
+ * @return Retrieve rpc log details
+ */
+ public RpcLogDetails getPayload() {
+ final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
+ this.rpcLogDetails = null;
+ return rpcLogDetails;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
new file mode 100644
index 0000000..e7ab7d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * RpcCall details that would be passed on to ring buffer of slow log responses
+ */
+@InterfaceAudience.Private
+public class RpcLogDetails {
+
+ private RpcCall rpcCall;
+ private String clientAddress;
+ private long responseSize;
+ private String className;
+
+ public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize,
+ String className) {
+ this.rpcCall = rpcCall;
+ this.clientAddress = clientAddress;
+ this.responseSize = responseSize;
+ this.className = className;
+ }
+
+ public RpcCall getRpcCall() {
+ return rpcCall;
+ }
+
+ public String getClientAddress() {
+ return clientAddress;
+ }
+
+ public long getResponseSize() {
+ return responseSize;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("rpcCall", rpcCall)
+ .append("clientAddress", clientAddress)
+ .append("responseSize", responseSize)
+ .append("className", className)
+ .toString();
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java
new file mode 100644
index 0000000..24e8460
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Event Handler run by disruptor ringbuffer consumer
+ */
+@InterfaceAudience.Private
+class SlowLogEventHandler implements EventHandler<RingBufferEnvelope> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogEventHandler.class);
+
+ private final Queue<SlowLogPayload> queue;
+
+ SlowLogEventHandler(int eventCount) {
+ EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
+ queue = Queues.synchronizedQueue(evictingQueue);
+ }
+
+ /**
+ * Called when a publisher has published an event to the {@link RingBuffer}
+ *
+ * @param event published to the {@link RingBuffer}
+ * @param sequence of the event being processed
+ * @param endOfBatch flag to indicate if this is the last event in a batch from
+ * the {@link RingBuffer}
+ * @throws Exception if the EventHandler would like the exception handled further up the chain
+ */
+ @Override
+ public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
+ throws Exception {
+ final RpcLogDetails rpcCallDetails = event.getPayload();
+ final RpcCall rpcCall = rpcCallDetails.getRpcCall();
+ final String clientAddress = rpcCallDetails.getClientAddress();
+ final long responseSize = rpcCallDetails.getResponseSize();
+ final String className = rpcCallDetails.getClassName();
+ Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
+ Message param = rpcCall.getParam();
+ long receiveTime = rpcCall.getReceiveTime();
+ long startTime = rpcCall.getStartTime();
+ long endTime = System.currentTimeMillis();
+ int processingTime = (int) (endTime - startTime);
+ int qTime = (int) (startTime - receiveTime);
+ final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
+ int numGets = 0;
+ int numMutations = 0;
+ int numServiceCalls = 0;
+ if (param instanceof ClientProtos.MultiRequest) {
+ ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
+ for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
+ for (ClientProtos.Action action : regionAction.getActionList()) {
+ if (action.hasMutation()) {
+ numMutations++;
+ }
+ if (action.hasGet()) {
+ numGets++;
+ }
+ if (action.hasServiceCall()) {
+ numServiceCalls++;
+ }
+ }
+ }
+ }
+ final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
+ final String methodDescriptorName =
+ methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
+ SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
+ .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
+ .setClientAddress(clientAddress)
+ .setMethodName(methodDescriptorName)
+ .setMultiGets(numGets)
+ .setMultiMutations(numMutations)
+ .setMultiServiceCalls(numServiceCalls)
+ .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
+ .setProcessingTime(processingTime)
+ .setQueueTime(qTime)
+ .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
+ .setResponseSize(responseSize)
+ .setServerClass(className)
+ .setStartTime(startTime)
+ .setUserName(userName)
+ .build();
+ queue.add(slowLogPayload);
+ }
+
+ /**
+ * Cleans up slow log payloads
+ *
+ * @return true if slow log payloads are cleaned up, false otherwise
+ */
+ boolean clearSlowLogs() {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received request to clean up online slowlog buffer..");
+ }
+ queue.clear();
+ return true;
+ }
+
+ /**
+ * Retrieve list of slow log payloads
+ *
+ * @param request slow log request parameters
+ * @return list of slow log payloads
+ */
+ List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
+ List<SlowLogPayload> slowLogPayloadList =
+ Arrays.stream(queue.toArray(new SlowLogPayload[0])).collect(Collectors.toList());
+
+ // latest slow logs first, operator is interested in latest records from in-memory buffer
+ Collections.reverse(slowLogPayloadList);
+
+ if (isFilterProvided(request)) {
+ slowLogPayloadList = filterSlowLogs(request, slowLogPayloadList);
+ }
+ int limit = request.getLimit() >= slowLogPayloadList.size() ? slowLogPayloadList.size()
+ : request.getLimit();
+ return slowLogPayloadList.subList(0, limit);
+ }
+
+ private boolean isFilterProvided(AdminProtos.SlowLogResponseRequest request) {
+ if (StringUtils.isNotEmpty(request.getUserName())) {
+ return true;
+ }
+ if (StringUtils.isNotEmpty(request.getTableName())) {
+ return true;
+ }
+ if (StringUtils.isNotEmpty(request.getClientAddress())) {
+ return true;
+ }
+ return StringUtils.isNotEmpty(request.getRegionName());
+ }
+
+ private List<SlowLogPayload> filterSlowLogs(AdminProtos.SlowLogResponseRequest request,
+ List<SlowLogPayload> slowLogPayloadList) {
+ List<SlowLogPayload> filteredSlowLogPayloads = new ArrayList<>();
+ for (SlowLogPayload slowLogPayload : slowLogPayloadList) {
+ if (StringUtils.isNotEmpty(request.getRegionName())) {
+ if (slowLogPayload.getRegionName().equals(request.getRegionName())) {
+ filteredSlowLogPayloads.add(slowLogPayload);
+ continue;
+ }
+ }
+ if (StringUtils.isNotEmpty(request.getTableName())) {
+ if (slowLogPayload.getRegionName().startsWith(request.getTableName())) {
+ filteredSlowLogPayloads.add(slowLogPayload);
+ continue;
+ }
+ }
+ if (StringUtils.isNotEmpty(request.getClientAddress())) {
+ if (slowLogPayload.getClientAddress().equals(request.getClientAddress())) {
+ filteredSlowLogPayloads.add(slowLogPayload);
+ continue;
+ }
+ }
+ if (StringUtils.isNotEmpty(request.getUserName())) {
+ if (slowLogPayload.getUserName().equals(request.getUserName())) {
+ filteredSlowLogPayloads.add(slowLogPayload);
+ }
+ }
+ }
+ return filteredSlowLogPayloads;
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
new file mode 100644
index 0000000..d750642
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Online SlowLog Provider Service that keeps slow RPC logs in the ring buffer.
+ * The service uses LMAX Disruptor to save slow records which are then consumed by
+ * a queue and based on the ring buffer size, the available records are then fetched
+ * from the queue in thread-safe manner.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SlowLogRecorder {
+
+ private final Disruptor<RingBufferEnvelope> disruptor;
+ private final SlowLogEventHandler slowLogEventHandler;
+ private final int eventCount;
+ private final boolean isOnlineSlowLogProviderEnabled;
+
+ private static final String SLOW_LOG_RING_BUFFER_SIZE =
+ "hbase.regionserver.slowlog.ringbuffer.size";
+
+ /**
+ * Initialize disruptor with configurable ringbuffer size
+ */
+ public SlowLogRecorder(Configuration conf) {
+ isOnlineSlowLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+ HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+
+ if (!isOnlineSlowLogProviderEnabled) {
+ this.disruptor = null;
+ this.slowLogEventHandler = null;
+ this.eventCount = 0;
+ return;
+ }
+
+ this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
+ HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+
+ // This is the 'writer' -- a single threaded executor. This single thread consumes what is
+ // put on the ringbuffer.
+ final String hostingThreadName = Thread.currentThread().getName();
+
+ // disruptor initialization with BlockingWaitStrategy
+ this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
+ getEventCount(),
+ Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
+ ProducerType.MULTI,
+ new BlockingWaitStrategy());
+ this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
+
+ // initialize ringbuffer event handler
+ this.slowLogEventHandler = new SlowLogEventHandler(this.eventCount);
+ this.disruptor.handleEventsWith(new SlowLogEventHandler[]{this.slowLogEventHandler});
+ this.disruptor.start();
+ }
+
+ // must be power of 2 for disruptor ringbuffer
+ private int getEventCount() {
+ Preconditions.checkArgument(eventCount >= 0,
+ SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
+ int floor = Integer.highestOneBit(eventCount);
+ if (floor == eventCount) {
+ return floor;
+ }
+ // max capacity is 1 << 30
+ if (floor >= 1 << 29) {
+ return 1 << 30;
+ }
+ return floor << 1;
+ }
+
+ /**
+ * Retrieve online slow logs from ringbuffer
+ *
+ * @param request slow log request parameters
+ * @return online slow logs from ringbuffer
+ */
+ public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
+ return isOnlineSlowLogProviderEnabled ? this.slowLogEventHandler.getSlowLogPayloads(request)
+ : Collections.emptyList();
+ }
+
+ /**
+ * clears slow log payloads from ringbuffer
+ *
+ * @return true if slow log payloads are cleaned up or
+ * hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
+ * clean up slow logs
+ */
+ public boolean clearSlowLogPayloads() {
+ if (!isOnlineSlowLogProviderEnabled) {
+ return true;
+ }
+ return this.slowLogEventHandler.clearSlowLogs();
+ }
+
+ /**
+ * Add slow log rpcCall details to ringbuffer
+ *
+ * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
+ * consumers
+ */
+ public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
+ if (!isOnlineSlowLogProviderEnabled) {
+ return;
+ }
+ RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
+ long seqId = ringBuffer.next();
+ try {
+ ringBuffer.get(seqId).load(rpcLogDetails);
+ } finally {
+ ringBuffer.publish(seqId);
+ }
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 5beef53..89329c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
@@ -868,4 +870,29 @@ public class TestAdmin2 extends TestAdminBase {
assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled());
}
+ @Test
+ public void testSlowLogResponses() throws Exception {
+ // get all live server names
+ Collection<ServerName> serverNames = ADMIN.getRegionServers();
+ List<ServerName> serverNameList = new ArrayList<>(serverNames);
+
+ // clean up slowlog responses maintained in memory by RegionServers
+ List<Boolean> areSlowLogsCleared = ADMIN.clearSlowLogResponses(new HashSet<>(serverNameList));
+
+ int countFailedClearSlowResponse = 0;
+ for (Boolean isSlowLogCleared : areSlowLogsCleared) {
+ if (!isSlowLogCleared) {
+ ++countFailedClearSlowResponse;
+ }
+ }
+ Assert.assertEquals(countFailedClearSlowResponse, 0);
+
+ SlowLogQueryFilter slowLogQueryFilter = new SlowLogQueryFilter();
+ List<SlowLogRecord> slowLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames),
+ slowLogQueryFilter);
+
+ // after cleanup of slowlog responses, total count of slowlog payloads should be 0
+ Assert.assertEquals(slowLogRecords.size(), 0);
+ }
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
index 3737101..c379775 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
@@ -46,6 +46,7 @@ public class TestAdminBase {
TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30);
TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30);
+ TEST_UTIL.getConfiguration().setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
TEST_UTIL.startMiniCluster(3);
ADMIN = TEST_UTIL.getAdmin();
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2797df3..f555f4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -34,6 +34,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -51,9 +52,11 @@ import org.junit.Assume;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -524,4 +527,5 @@ public abstract class AbstractTestIPC {
rpcServer.stop();
}
}
+
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 604797a..dbd1d9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -109,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@@ -678,6 +682,18 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
}
@Override
+ public SlowLogResponses getSlowLogResponses(RpcController controller,
+ SlowLogResponseRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
+ public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+ ClearSlowLogResponseRequest request) throws ServiceException {
+ return null;
+ }
+
+ @Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)
throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
new file mode 100644
index 0000000..240230e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
@@ -0,0 +1,593 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Tests for Online SlowLog Provider Service
+ */
+@Category({MasterTests.class, MediumTests.class})
+public class TestSlowLogRecorder {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
+
+ private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
+
+ private SlowLogRecorder slowLogRecorder;
+
+ private static int i = 0;
+
+ private static Configuration applySlowLogRecorderConf(int eventSize) {
+
+ Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+ conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+ conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
+ return conf;
+
+ }
+
+ /**
+ * confirm that for a ringbuffer of slow logs, payload on given index of buffer
+ * has expected elements
+ *
+ * @param i index of ringbuffer logs
+ * @param j data value that was put on index i
+ * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
+ */
+ private void confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
+
+ Assert.assertEquals(slowLogPayloads.get(i).getClientAddress(), "client_" + j);
+ Assert.assertEquals(slowLogPayloads.get(i).getUserName(), "userName_" + j);
+ Assert.assertEquals(slowLogPayloads.get(i).getServerClass(), "class_" + j);
+ }
+
+ @Test
+ public void testOnlieSlowLogConsumption() throws Exception {
+
+ Configuration conf = applySlowLogRecorderConf(8);
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
+
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ int i = 0;
+
+ // add 5 records initially
+ for (; i < 5; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
+ List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ confirmPayloadParams(0, 5, slowLogPayloads);
+ confirmPayloadParams(1, 4, slowLogPayloads);
+ confirmPayloadParams(2, 3, slowLogPayloads);
+ confirmPayloadParams(3, 2, slowLogPayloads);
+ confirmPayloadParams(4, 1, slowLogPayloads);
+
+ // add 2 more records
+ for (; i < 7; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
+
+ slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+
+ Assert.assertEquals(slowLogPayloads.size(), 7);
+ confirmPayloadParams(0, 7, slowLogPayloads);
+ confirmPayloadParams(5, 2, slowLogPayloads);
+ confirmPayloadParams(6, 1, slowLogPayloads);
+
+ // add 3 more records
+ for (; i < 10; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+
+ slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ // confirm ringbuffer is full
+ Assert.assertEquals(slowLogPayloads.size(), 8);
+ confirmPayloadParams(7, 3, slowLogPayloads);
+ confirmPayloadParams(0, 10, slowLogPayloads);
+ confirmPayloadParams(1, 9, slowLogPayloads);
+
+ // add 4 more records
+ for (; i < 14; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+
+ slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ // confirm ringbuffer is full
+ Assert.assertEquals(slowLogPayloads.size(), 8);
+ confirmPayloadParams(0, 14, slowLogPayloads);
+ confirmPayloadParams(1, 13, slowLogPayloads);
+ confirmPayloadParams(2, 12, slowLogPayloads);
+ confirmPayloadParams(3, 11, slowLogPayloads);
+
+ boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+ Assert.assertTrue(isRingBufferCleaned);
+
+ LOG.debug("cleared the ringbuffer of Online Slow Log records");
+
+ slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ // confirm ringbuffer is empty
+ Assert.assertEquals(slowLogPayloads.size(), 0);
+
+ }
+
+ @Test
+ public void testOnlineSlowLogWithHighRecords() throws Exception {
+
+ Configuration conf = applySlowLogRecorderConf(14);
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
+
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ for (int i = 0; i < 14 * 11; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+ LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+
+ List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ Assert.assertEquals(slowLogPayloads.size(), 14);
+
+ // confirm strict order of slow log payloads
+ confirmPayloadParams(0, 154, slowLogPayloads);
+ confirmPayloadParams(1, 153, slowLogPayloads);
+ confirmPayloadParams(2, 152, slowLogPayloads);
+ confirmPayloadParams(3, 151, slowLogPayloads);
+ confirmPayloadParams(4, 150, slowLogPayloads);
+ confirmPayloadParams(5, 149, slowLogPayloads);
+ confirmPayloadParams(6, 148, slowLogPayloads);
+ confirmPayloadParams(7, 147, slowLogPayloads);
+ confirmPayloadParams(8, 146, slowLogPayloads);
+ confirmPayloadParams(9, 145, slowLogPayloads);
+ confirmPayloadParams(10, 144, slowLogPayloads);
+ confirmPayloadParams(11, 143, slowLogPayloads);
+ confirmPayloadParams(12, 142, slowLogPayloads);
+ confirmPayloadParams(13, 141, slowLogPayloads);
+
+ boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+ Assert.assertTrue(isRingBufferCleaned);
+ LOG.debug("cleared the ringbuffer of Online Slow Log records");
+ slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+
+ // confirm ringbuffer is empty
+ Assert.assertEquals(slowLogPayloads.size(), 0);
+
+ }
+
+ @Test
+ public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
+
+ Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+ conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
+
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().build();
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ for (int i = 0; i < 300; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ Assert.assertEquals(slowLogPayloads.size(), 0);
+
+ }
+
+ @Test
+ public void testOnlineSlowLogWithDisableConfig() throws Exception {
+
+ Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+ conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
+
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().build();
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ for (int i = 0; i < 300; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+ List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ Assert.assertEquals(slowLogPayloads.size(), 0);
+ conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+
+ }
+
+ @Test
+ public void testSlowLogFilters() throws Exception {
+
+ Configuration conf = applySlowLogRecorderConf(30);
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(15)
+ .setUserName("userName_87")
+ .build();
+
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ for (int i = 0; i < 100; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+ LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
+
+ AdminProtos.SlowLogResponseRequest requestClient =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(15)
+ .setClientAddress("client_85")
+ .build();
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
+
+ AdminProtos.SlowLogResponseRequest requestSlowLog =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(15)
+ .build();
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+ () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
+
+ }
+
+ @Test
+ public void testConcurrentSlowLogEvents() throws Exception {
+
+ Configuration conf = applySlowLogRecorderConf(50000);
+ slowLogRecorder = new SlowLogRecorder(conf);
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+ for (int j = 0; j < 1000; j++) {
+
+ CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < 3500; i++) {
+ RpcLogDetails rpcLogDetails =
+ getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+ });
+
+ }
+
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+ slowLogRecorder.clearSlowLogPayloads();
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
+ 4000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
+ }
+
+ private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
+ return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className);
+ }
+
+ private RpcCall getRpcCall(String userName) {
+ RpcCall rpcCall = new RpcCall() {
+ @Override
+ public BlockingService getService() {
+ return null;
+ }
+
+ @Override
+ public Descriptors.MethodDescriptor getMethod() {
+ return null;
+ }
+
+ @Override
+ public Message getParam() {
+ return getMessage();
+ }
+
+ @Override
+ public CellScanner getCellScanner() {
+ return null;
+ }
+
+ @Override
+ public long getReceiveTime() {
+ return 0;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public void setStartTime(long startTime) {
+
+ }
+
+ @Override
+ public int getTimeout() {
+ return 0;
+ }
+
+ @Override
+ public int getPriority() {
+ return 0;
+ }
+
+ @Override
+ public long getDeadline() {
+ return 0;
+ }
+
+ @Override
+ public long getSize() {
+ return 0;
+ }
+
+ @Override
+ public RPCProtos.RequestHeader getHeader() {
+ return null;
+ }
+
+ @Override
+ public int getRemotePort() {
+ return 0;
+ }
+
+ @Override
+ public void setResponse(Message param, CellScanner cells,
+ Throwable errorThrowable, String error) {
+ }
+
+ @Override
+ public void sendResponseIfReady() throws IOException {
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public String toShortString() {
+ return null;
+ }
+
+ @Override
+ public long disconnectSince() {
+ return 0;
+ }
+
+ @Override
+ public boolean isClientCellBlockSupported() {
+ return false;
+ }
+
+ @Override
+ public Optional<User> getRequestUser() {
+ return getUser(userName);
+ }
+
+ @Override
+ public InetAddress getRemoteAddress() {
+ return null;
+ }
+
+ @Override
+ public HBaseProtos.VersionInfo getClientVersionInfo() {
+ return null;
+ }
+
+ @Override
+ public void setCallBack(RpcCallback callback) {
+ }
+
+ @Override
+ public boolean isRetryImmediatelySupported() {
+ return false;
+ }
+
+ @Override
+ public long getResponseCellSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseCellSize(long cellSize) {
+ }
+
+ @Override
+ public long getResponseBlockSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseBlockSize(long blockSize) {
+ }
+
+ @Override
+ public long getResponseExceptionSize() {
+ return 0;
+ }
+
+ @Override
+ public void incrementResponseExceptionSize(long exceptionSize) {
+ }
+ };
+ return rpcCall;
+ }
+
+ private Message getMessage() {
+
+ i = (i + 1) % 3;
+
+ Message message = null;
+
+ switch (i) {
+
+ case 0: {
+ message = ClientProtos.ScanRequest.newBuilder()
+ .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+ .setValue(ByteString.copyFromUtf8("region1"))
+ .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
+ .build())
+ .build();
+ break;
+ }
+ case 1: {
+ message = ClientProtos.MutateRequest.newBuilder()
+ .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+ .setValue(ByteString.copyFromUtf8("region2"))
+ .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
+ .setMutation(ClientProtos.MutationProto.newBuilder()
+ .setRow(ByteString.copyFromUtf8("row123"))
+ .build())
+ .build();
+ break;
+ }
+ case 2: {
+ message = ClientProtos.GetRequest.newBuilder()
+ .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+ .setValue(ByteString.copyFromUtf8("region2"))
+ .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
+ .setGet(ClientProtos.Get.newBuilder()
+ .setRow(ByteString.copyFromUtf8("row123"))
+ .build())
+ .build();
+ break;
+ }
+
+ }
+
+ return message;
+
+ }
+
+ private Optional<User> getUser(String userName) {
+
+ return Optional.of(new User() {
+
+
+ @Override
+ public String getShortName() {
+ return userName;
+ }
+
+
+ @Override
+ public <T> T runAs(PrivilegedAction<T> action) {
+ return null;
+ }
+
+
+ @Override
+ public <T> T runAs(PrivilegedExceptionAction<T> action) throws
+ IOException, InterruptedException {
+ return null;
+ }
+
+ });
+
+ }
+
+}
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index f60838e..c3b4a8e 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -1434,6 +1434,79 @@ module Hbase
end
#----------------------------------------------------------------------------------------------
+ # Retrieve SlowLog Responses from RegionServers
+ def get_slowlog_responses(server_names, args)
+ unless server_names.is_a?(Array) || server_names.is_a?(String)
+ raise(ArgumentError,
+ "#{server_names.class} of #{server_names.inspect} is not of Array/String type")
+ end
+ if server_names == '*'
+ server_names = getServerNames([], true)
+ else
+ server_names_list = to_server_names(server_names)
+ server_names = getServerNames(server_names_list, false)
+ end
+ filter_params = get_filter_params(args)
+ slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
+ filter_params)
+ slow_log_responses_arr = []
+ for slow_log_response in slow_log_responses
+ slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
+ end
+ puts 'Retrieved SlowLog Responses from RegionServers'
+ puts slow_log_responses_arr
+ end
+
+ def get_filter_params(args)
+ filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new
+ if args.key? 'REGION_NAME'
+ region_name = args['REGION_NAME']
+ filter_params.setRegionName(region_name)
+ end
+ if args.key? 'TABLE_NAME'
+ table_name = args['TABLE_NAME']
+ filter_params.setTableName(table_name)
+ end
+ if args.key? 'CLIENT_IP'
+ client_ip = args['CLIENT_IP']
+ filter_params.setClientAddress(client_ip)
+ end
+ if args.key? 'USER'
+ user = args['USER']
+ filter_params.setUserName(user)
+ end
+ if args.key? 'LIMIT'
+ limit = args['LIMIT']
+ filter_params.setLimit(limit)
+ end
+ filter_params
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Clears SlowLog Responses from RegionServers
+ def clear_slowlog_responses(server_names)
+ unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
+ raise(ArgumentError,
+ "#{server_names.class} of #{server_names.inspect} is not of correct type")
+ end
+ if server_names.nil?
+ server_names = getServerNames([], true)
+ else
+ server_names_list = to_server_names(server_names)
+ server_names = getServerNames(server_names_list, false)
+ end
+ clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names))
+ clear_log_success_count = 0
+ clear_log_responses.each do |response|
+ if response
+ clear_log_success_count += 1
+ end
+ end
+ puts 'Cleared Slowlog responses from ' \
+ "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers"
+ end
+
+ #----------------------------------------------------------------------------------------------
# Decommission a list of region servers, optionally offload corresponding regions
def decommission_regionservers(host_or_servers, should_offload)
# Fail if host_or_servers is neither a string nor an array
@@ -1507,6 +1580,17 @@ module Hbase
def stop_regionserver(hostport)
@admin.stopRegionServer(hostport)
end
+
+ #----------------------------------------------------------------------------------------------
+ # Get list of server names
+ def to_server_names(server_names)
+ if server_names.is_a?(Array)
+ server_names
+ else
+ java.util.Arrays.asList(server_names)
+ end
+ end
+
end
# rubocop:enable Metrics/ClassLength
end
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index f09149f..6b441d6 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -333,10 +333,12 @@ Shell.load_command_group(
normalizer_switch
normalizer_enabled
is_in_maintenance_mode
+ clear_slowlog_responses
close_region
compact
compaction_switch
flush
+ get_slowlog_responses
major_compact
move
split
diff --git a/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb
new file mode 100644
index 0000000..ea96de3
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb
@@ -0,0 +1,47 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Clear slowlog responses maintained in memory by RegionServers
+
+module Shell
+ module Commands
+ # Clear slowlog responses
+ class ClearSlowlogResponses < Command
+ def help
+ <<-EOF
+Clears SlowLog Responses maintained by each or specific RegionServers.
+Specify array of server names for specific RS. A server name is
+the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Examples:
+
+ hbase> clear_slowlog_responses => clears slowlog responses from all RS
+ hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1,
+ SERVER_NAME2
+
+
+ EOF
+ end
+
+ def command(server_names = nil)
+ admin.clear_slowlog_responses(server_names)
+ end
+ end
+ end
+end
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
new file mode 100644
index 0000000..55759ca
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
@@ -0,0 +1,78 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Retrieve latest slowlog responses maintained in memory by RegionServers
+
+module Shell
+ module Commands
+ # Retrieve latest slowlog responses
+ class GetSlowlogResponses < Command
+ def help
+ <<-EOF
+Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
+Specify '*' to include all RS otherwise array of server names for specific
+RS. A server name is the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Provide optional filter parameters as Hash.
+Default Limit of each server for providing no of slow log records is 10. User can specify
+more limit by 'LIMIT' param in case more than 10 records should be retrieved.
+
+Examples:
+
+ hbase> get_slowlog_responses '*' => get slowlog responses from all RS
+ hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS
+ with 50 records limit (default limit: 10)
+ hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1,
+ SERVER_NAME2
+ hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
+ => get slowlog responses only related to meta
+ region
+ hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table
+ hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
+ => get slowlog responses with given client
+ IP address and get 100 records limit
+ (default limit: 10)
+ hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
+ => get slowlog responses with given region name
+ or table name
+ hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
+ => get slowlog responses that match either
+ provided client IP address or user name
+
+Sometimes output can be long pretty printed json for user to scroll in
+a single screen and hence user might prefer
+redirecting output of get_slowlog_responses to a file.
+
+Example:
+
+echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
+
+ EOF
+ end
+
+ def command(server_names, args = {})
+ unless args.is_a? Hash
+ raise 'Filter parameters are not Hash'
+ end
+
+ admin.get_slowlog_responses(server_names, args)
+ end
+ end
+ end
+end
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 7d32645..3264579 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -201,6 +201,20 @@ module Hbase
#-------------------------------------------------------------------------------
+ define_test 'get slowlog responses should work' do
+ output = capture_stdout { command(:get_slowlog_responses, '*', {}) }
+ assert(output.include?('Retrieved SlowLog Responses from RegionServers'))
+ end
+
+ #-------------------------------------------------------------------------------
+
+ define_test 'clear slowlog responses should work' do
+ output = capture_stdout { command(:clear_slowlog_responses, nil) }
+ assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers'))
+ end
+
+ #-------------------------------------------------------------------------------
+
define_test "create should fail with non-string/non-hash column args" do
assert_raise(ArgumentError) do
command(:create, @create_test_name, 123)
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 600c35d..5b56e62 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.client.CompactType;
import org.apache.hadoop.hbase.client.CompactionState;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
+import org.apache.hadoop.hbase.client.SlowLogRecord;
import org.apache.hadoop.hbase.client.SnapshotDescription;
import org.apache.hadoop.hbase.client.SnapshotType;
import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -1393,6 +1395,17 @@ public class ThriftAdmin implements Admin {
}
@Override
+ public List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
+ final SlowLogQueryFilter slowLogQueryFilter) {
+ throw new NotImplementedException("getSlowLogResponses not supported in ThriftAdmin");
+ }
+
+ @Override
+ public List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames) {
+ throw new NotImplementedException("clearSlowLogsResponses not supported in ThriftAdmin");
+ }
+
+ @Override
public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
return splitRegionAsync(regionName, null);
}
diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc
index ebed2f2..b0cd4bf 100644
--- a/src/main/asciidoc/_chapters/hbase-default.adoc
+++ b/src/main/asciidoc/_chapters/hbase-default.adoc
@@ -2072,6 +2072,44 @@ A comma-separated list of
`-1`
+[[hbase.regionserver.slowlog.ringbuffer.size]]
+*`hbase.regionserver.slowlog.ringbuffer.size`*::
++
+.Description
+
+ Default size of ringbuffer to be maintained by each RegionServer in order
+ to store online slowlog responses. This is an in-memory ring buffer of
+ requests that were judged to be too slow in addition to the responseTooSlow
+ logging. The in-memory representation would be complete.
+ For more details, please look into Doc Section:
+ <<slow_log_responses, slow_log_responses>>
+
+
++
+.Default
+`256`
+
+
+
+[[hbase.regionserver.slowlog.buffer.enabled]]
+*`hbase.regionserver.slowlog.buffer.enabled`*::
++
+.Description
+
+ Indicates whether RegionServers have ring buffer running for storing
+ Online Slow logs in FIFO manner with limited entries. The size of
+ the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
+ The default value is false, turn this on and get latest slowlog
+ responses with complete data.
+ For more details, please look into Doc Section:
+ <<slow_log_responses, slow_log_responses>>
+
+
++
+.Default
+`false`
+
+
[[hbase.region.replica.replication.enabled]]
*`hbase.region.replica.replication.enabled`*::
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index d8afcd5..f8f9c2a 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1813,6 +1813,120 @@ In the case that the call is not a client operation, that detailed fingerprint i
This particular example, for example, would indicate that the likely cause of slowness is simply a very large (on the order of 100MB) multiput, as we can tell by the "vlen," or value length, fields of each put in the multiPut.
+[[slow_log_responses]]
+==== Get Slow Response Log from shell
+When an individual RPC exceeds a configurable time bound we log a complaint
+by way of the logging subsystem
+
+e.g.
+
+----
+2019-10-02 10:10:22,195 WARN [,queue=15,port=60020] ipc.RpcServer - (responseTooSlow):
+{"call":"Scan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ScanRequest)",
+"starttimems":1567203007549,
+"responsesize":6819737,
+"method":"Scan",
+"param":"region { type: REGION_NAME value: \"t1,\\000\\000\\215\\f)o\\\\\\024\\302\\220\\000\\000\\000\\000\\000\\001\\000\\000\\000\\000\\000\\006\\000\\000\\000\\000\\000\\005\\000\\000<TRUNCATED>",
+"processingtimems":28646,
+"client":"10.253.196.215:41116",
+"queuetimems":22453,
+"class":"HRegionServer"}
+----
+
+Unfortunately often the request parameters are truncated as per above Example.
+The truncation is unfortunate because it eliminates much of the utility of
+the warnings. For example, the region name, the start and end keys, and the
+filter hierarchy are all important clues for debugging performance problems
+caused by moderate to low selectivity queries or queries made at a high rate.
+
+HBASE-22978 introduces maintaining an in-memory ring buffer of requests that were judged to
+be too slow in addition to the responseTooSlow logging. The in-memory representation can be
+complete. There is some chance a high rate of requests will cause information on other
+interesting requests to be overwritten before it can be read. This is an acceptable trade off.
+
+In order to enable the in-memory ring buffer at RegionServers, we need to enable
+config:
+----
+hbase.regionserver.slowlog.buffer.enabled
+----
+
+One more config determines the size of the ring buffer:
+----
+hbase.regionserver.slowlog.ringbuffer.size
+----
+
+Check the config section for the detailed description.
+
+This config would be disabled by default. Turn it on and these shell commands
+would provide expected results from the ring-buffers.
+
+
+shell commands to retrieve slowlog responses from RegionServers:
+
+----
+Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
+Specify '*' to include all RS otherwise array of server names for specific
+RS. A server name is the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Provide optional filter parameters as Hash.
+Default Limit of each server for providing no of slow log records is 10. User can specify
+more limit by 'LIMIT' param in case more than 10 records should be retrieved.
+
+Examples:
+
+ hbase> get_slowlog_responses '*' => get slowlog responses from all RS
+ hbase> get_slowlog_responses '*', {'LIMIT' => 50} => get slowlog responses from all RS
+ with 50 records limit (default limit: 10)
+ hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => get slowlog responses from SERVER_NAME1,
+ SERVER_NAME2
+ hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
+ => get slowlog responses only related to meta
+ region
+ hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'} => get slowlog responses only related to t1 table
+ hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
+ => get slowlog responses with given client
+ IP address and get 100 records limit
+ (default limit: 10)
+ hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
+ => get slowlog responses with given region name
+ or table name
+ hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
+ => get slowlog responses that match either
+ provided client IP address or user name
+
+
+----
+
+Sometimes output can be long pretty printed json for user to scroll in
+a single screen and hence user might prefer
+redirecting output of get_slowlog_responses to a file.
+
+Example:
+----
+echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
+----
+
+
+shell command to clear slowlog responses from RegionServer:
+
+----
+Clears SlowLog Responses maintained by each or specific RegionServers.
+Specify array of server names for specific RS. A server name is
+the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Examples:
+
+ hbase> clear_slowlog_responses => clears slowlog responses from all RS
+ hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2'] => clears slowlog responses from SERVER_NAME1,
+ SERVER_NAME2
+
+
+----
+
=== Block Cache Monitoring
Starting with HBase 0.98, the HBase Web UI includes the ability to monitor and report on the performance of the block cache.