You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/02/29 19:02:47 UTC

[hbase] branch branch-2 updated: HBASE-22978 : Online slow response log (#1228)

This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 6366b73  HBASE-22978 : Online slow response log (#1228)
6366b73 is described below

commit 6366b7313437210e79b47e50b87c6a3cf0e3e4f3
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Sun Mar 1 00:32:35 2020 +0530

    HBASE-22978 : Online slow response log (#1228)
    
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
---
 .../java/org/apache/hadoop/hbase/client/Admin.java |  25 +
 .../org/apache/hadoop/hbase/client/AsyncAdmin.java |  21 +
 .../hadoop/hbase/client/AsyncHBaseAdmin.java       |  12 +
 .../org/apache/hadoop/hbase/client/HBaseAdmin.java |  66 +++
 .../hadoop/hbase/client/RawAsyncHBaseAdmin.java    |  63 +++
 .../apache/hadoop/hbase/client/SlowLogParams.java  |  89 ++++
 .../hadoop/hbase/client/SlowLogQueryFilter.java    | 122 +++++
 .../apache/hadoop/hbase/client/SlowLogRecord.java  | 319 +++++++++++
 .../hadoop/hbase/shaded/protobuf/ProtobufUtil.java | 113 ++++
 .../hbase/shaded/protobuf/RequestConverter.java    |  44 ++
 .../java/org/apache/hadoop/hbase/HConstants.java   |   6 +
 hbase-common/src/main/resources/hbase-default.xml  |  23 +
 .../src/main/protobuf/Admin.proto                  |  27 +
 .../src/main/protobuf/TooSlowLog.proto             |  45 ++
 .../org/apache/hadoop/hbase/ipc/RpcServer.java     |  75 ++-
 .../hadoop/hbase/ipc/RpcServerInterface.java       |  17 +-
 .../hadoop/hbase/regionserver/HRegionServer.java   |  18 +
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  41 ++
 .../slowlog/DisruptorExceptionHandler.java         |  50 ++
 .../regionserver/slowlog/RingBufferEnvelope.java   |  57 ++
 .../hbase/regionserver/slowlog/RpcLogDetails.java  |  71 +++
 .../regionserver/slowlog/SlowLogEventHandler.java  | 208 ++++++++
 .../regionserver/slowlog/SlowLogRecorder.java      | 153 ++++++
 .../org/apache/hadoop/hbase/client/TestAdmin2.java |  27 +
 .../apache/hadoop/hbase/client/TestAdminBase.java  |   1 +
 .../apache/hadoop/hbase/ipc/AbstractTestIPC.java   |   6 +-
 .../hadoop/hbase/master/MockRegionServer.java      |  16 +
 .../regionserver/slowlog/TestSlowLogRecorder.java  | 593 +++++++++++++++++++++
 hbase-shell/src/main/ruby/hbase/admin.rb           |  84 +++
 hbase-shell/src/main/ruby/shell.rb                 |   2 +
 .../ruby/shell/commands/clear_slowlog_responses.rb |  47 ++
 .../ruby/shell/commands/get_slowlog_responses.rb   |  78 +++
 hbase-shell/src/test/ruby/hbase/admin_test.rb      |  14 +
 .../hadoop/hbase/thrift2/client/ThriftAdmin.java   |  13 +
 src/main/asciidoc/_chapters/hbase-default.adoc     |  38 ++
 src/main/asciidoc/_chapters/ops_mgt.adoc           | 114 ++++
 36 files changed, 2677 insertions(+), 21 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
index a8029f8..4576d7d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Admin.java
@@ -3134,4 +3134,29 @@ public interface Admin extends Abortable, Closeable {
    */
   boolean isSnapshotCleanupEnabled() throws IOException;
 
+
+  /**
+   * Retrieves online slow RPC logs from the provided list of
+   * RegionServers
+   *
+   * @param serverNames Server names to get slowlog responses from
+   * @param slowLogQueryFilter filter to be used if provided
+   * @return online slowlog response list
+   * @throws IOException if a remote or network exception occurs
+   */
+  List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
+      final SlowLogQueryFilter slowLogQueryFilter) throws IOException;
+
+  /**
+   * Clears online slow RPC logs from the provided list of
+   * RegionServers
+   *
+   * @param serverNames Set of Server names to clean slowlog responses from
+   * @return List of booleans representing if online slowlog response buffer is cleaned
+   *   from each RegionServer
+   * @throws IOException if a remote or network exception occurs
+   */
+  List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames)
+      throws IOException;
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
index 1aa7623..9e61a38 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdmin.java
@@ -1484,4 +1484,25 @@ public interface AsyncAdmin {
    */
   CompletableFuture<Boolean> isSnapshotCleanupEnabled();
 
+  /**
+   * Retrieves online slow RPC logs from the provided list of
+   * RegionServers
+   *
+   * @param serverNames Server names to get slowlog responses from
+   * @param slowLogQueryFilter filter to be used if provided
+   * @return Online slowlog response list. The return value wrapped by a {@link CompletableFuture}
+   */
+  CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(final Set<ServerName> serverNames,
+    final SlowLogQueryFilter slowLogQueryFilter);
+
+  /**
+   * Clears online slow RPC logs from the provided list of
+   * RegionServers
+   *
+   * @param serverNames Set of Server names to clean slowlog responses from
+   * @return List of booleans representing if online slowlog response buffer is cleaned
+   *   from each RegionServer. The return value wrapped by a {@link CompletableFuture}
+   */
+  CompletableFuture<List<Boolean>> clearSlowLogResponses(final Set<ServerName> serverNames);
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
index 3b51279..d5c9d09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncHBaseAdmin.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.client;
 
 import com.google.protobuf.RpcChannel;
+
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Map;
@@ -836,4 +837,15 @@ class AsyncHBaseAdmin implements AsyncAdmin {
     return wrap(rawAdmin.isSnapshotCleanupEnabled());
   }
 
+  @Override
+  public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
+      final Set<ServerName> serverNames, final SlowLogQueryFilter slowLogQueryFilter) {
+    return wrap(rawAdmin.getSlowLogResponses(serverNames, slowLogQueryFilter));
+  }
+
+  @Override
+  public CompletableFuture<List<Boolean>> clearSlowLogResponses(Set<ServerName> serverNames) {
+    return wrap(rawAdmin.clearSlowLogResponses(serverNames));
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
index 0b2be19..7ef5d4d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HBaseAdmin.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcController;
+import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.Closeable;
 import java.io.IOException;
 import java.io.InterruptedIOException;
@@ -111,6 +112,7 @@ import org.slf4j.LoggerFactory;
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
 
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -4358,4 +4360,68 @@ public class HBaseAdmin implements Admin {
 
   }
 
+  @Override
+  public List<SlowLogRecord> getSlowLogResponses(@Nullable final Set<ServerName> serverNames,
+      final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+    if (CollectionUtils.isEmpty(serverNames)) {
+      return Collections.emptyList();
+    }
+    return serverNames.stream().map(serverName -> {
+        try {
+          return getSlowLogResponseFromServer(serverName, slowLogQueryFilter);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+      }
+    ).flatMap(List::stream).collect(Collectors.toList());
+  }
+
+  private List<SlowLogRecord> getSlowLogResponseFromServer(final ServerName serverName,
+      final SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+    return getSlowLogResponsesFromServer(this.connection.getAdmin(serverName), slowLogQueryFilter);
+  }
+
+  private List<SlowLogRecord> getSlowLogResponsesFromServer(AdminService.BlockingInterface admin,
+      SlowLogQueryFilter slowLogQueryFilter) throws IOException {
+    return executeCallable(new RpcRetryingCallable<List<SlowLogRecord>>() {
+      @Override
+      protected List<SlowLogRecord> rpcCall(int callTimeout) throws Exception {
+        HBaseRpcController controller = rpcControllerFactory.newController();
+        AdminProtos.SlowLogResponses slowLogResponses =
+          admin.getSlowLogResponses(controller,
+            RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter));
+        return ProtobufUtil.toSlowLogPayloads(slowLogResponses);
+      }
+    });
+  }
+
+  @Override
+  public List<Boolean> clearSlowLogResponses(@Nullable final Set<ServerName> serverNames)
+      throws IOException {
+    if (CollectionUtils.isEmpty(serverNames)) {
+      return Collections.emptyList();
+    }
+    return serverNames.stream().map(serverName -> {
+      try {
+        return clearSlowLogsResponses(serverName);
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }).collect(Collectors.toList());
+  }
+
+  private Boolean clearSlowLogsResponses(final ServerName serverName) throws IOException {
+    AdminService.BlockingInterface admin = this.connection.getAdmin(serverName);
+    return executeCallable(new RpcRetryingCallable<Boolean>() {
+      @Override
+      protected Boolean rpcCall(int callTimeout) throws Exception {
+        HBaseRpcController controller = rpcControllerFactory.newController();
+        AdminProtos.ClearSlowLogResponses clearSlowLogResponses =
+          admin.clearSlowLogsResponses(controller,
+            RequestConverter.buildClearSlowLogResponseRequest());
+        return ProtobufUtil.toClearSlowLogPayload(clearSlowLogResponses);
+      }
+    });
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
index ee32e42..ddaf786 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncHBaseAdmin.java
@@ -23,6 +23,7 @@ import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
 import static org.apache.hadoop.hbase.util.FutureUtils.unwrapCompletionException;
 import com.google.protobuf.Message;
 import com.google.protobuf.RpcChannel;
+import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -45,6 +46,7 @@ import java.util.function.Supplier;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.AsyncMetaTableAccessor;
 import org.apache.hadoop.hbase.CacheEvictionStats;
@@ -102,6 +104,8 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
 import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
 import org.apache.hbase.thirdparty.io.netty.util.Timeout;
 import org.apache.hbase.thirdparty.io.netty.util.TimerTask;
+import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AccessControlProtos;
@@ -3884,4 +3888,63 @@ class RawAsyncHBaseAdmin implements AsyncAdmin {
         .call();
   }
 
+  @Override
+  public CompletableFuture<List<SlowLogRecord>> getSlowLogResponses(
+    @Nullable final Set<ServerName> serverNames,
+    final SlowLogQueryFilter slowLogQueryFilter) {
+    if (CollectionUtils.isEmpty(serverNames)) {
+      return CompletableFuture.completedFuture(Collections.emptyList());
+    }
+    return CompletableFuture.supplyAsync(() -> serverNames.stream()
+      .map((ServerName serverName) ->
+        getSlowLogResponseFromServer(serverName, slowLogQueryFilter))
+      .map(CompletableFuture::join)
+      .flatMap(List::stream)
+      .collect(Collectors.toList()));
+  }
+
+  private CompletableFuture<List<SlowLogRecord>> getSlowLogResponseFromServer(
+    final ServerName serverName, final SlowLogQueryFilter slowLogQueryFilter) {
+    return this.<List<SlowLogRecord>>newAdminCaller()
+      .action((controller, stub) -> this
+        .adminCall(
+          controller, stub, RequestConverter.buildSlowLogResponseRequest(slowLogQueryFilter),
+          AdminService.Interface::getSlowLogResponses,
+          ProtobufUtil::toSlowLogPayloads))
+      .serverName(serverName).call();
+  }
+
+  @Override
+  public CompletableFuture<List<Boolean>> clearSlowLogResponses(
+      @Nullable Set<ServerName> serverNames) {
+    if (CollectionUtils.isEmpty(serverNames)) {
+      return CompletableFuture.completedFuture(Collections.emptyList());
+    }
+    List<CompletableFuture<Boolean>> clearSlowLogResponseList = serverNames.stream()
+      .map(this::clearSlowLogsResponses)
+      .collect(Collectors.toList());
+    return convertToFutureOfList(clearSlowLogResponseList);
+  }
+
+  private CompletableFuture<Boolean> clearSlowLogsResponses(final ServerName serverName) {
+    return this.<Boolean>newAdminCaller()
+      .action(((controller, stub) -> this
+        .adminCall(
+          controller, stub, RequestConverter.buildClearSlowLogResponseRequest(),
+          AdminService.Interface::clearSlowLogsResponses,
+          ProtobufUtil::toClearSlowLogPayload))
+      ).serverName(serverName).call();
+  }
+
+  private static <T> CompletableFuture<List<T>> convertToFutureOfList(
+    List<CompletableFuture<T>> futures) {
+    CompletableFuture<Void> allDoneFuture =
+      CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
+    return allDoneFuture.thenApply(v ->
+      futures.stream()
+        .map(CompletableFuture::join)
+        .collect(Collectors.toList())
+    );
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java
new file mode 100644
index 0000000..86df9fd
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogParams.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * SlowLog params object that contains detailed info as params and region name : to be used
+ * for filter purpose
+ */
+@InterfaceAudience.Private
+public class SlowLogParams {
+
+  private final String regionName;
+  private final String params;
+
+  public SlowLogParams(String regionName, String params) {
+    this.regionName = regionName;
+    this.params = params;
+  }
+
+  public SlowLogParams(String params) {
+    this.regionName = StringUtils.EMPTY;
+    this.params = params;
+  }
+
+  public String getRegionName() {
+    return regionName;
+  }
+
+  public String getParams() {
+    return params;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("regionName", regionName)
+      .append("params", params)
+      .toString();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SlowLogParams that = (SlowLogParams) o;
+
+    return new EqualsBuilder()
+      .append(regionName, that.regionName)
+      .append(params, that.params)
+      .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(regionName)
+      .append(params)
+      .toHashCode();
+  }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java
new file mode 100644
index 0000000..aa56a8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogQueryFilter.java
@@ -0,0 +1,122 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * SlowLog Query Filter with all filter and limit parameters
+ */
+@InterfaceAudience.Private
+public class SlowLogQueryFilter {
+
+  private String regionName;
+  private String clientAddress;
+  private String tableName;
+  private String userName;
+  private int limit = 10;
+
+  public String getRegionName() {
+    return regionName;
+  }
+
+  public void setRegionName(String regionName) {
+    this.regionName = regionName;
+  }
+
+  public String getClientAddress() {
+    return clientAddress;
+  }
+
+  public void setClientAddress(String clientAddress) {
+    this.clientAddress = clientAddress;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public void setUserName(String userName) {
+    this.userName = userName;
+  }
+
+  public int getLimit() {
+    return limit;
+  }
+
+  public void setLimit(int limit) {
+    this.limit = limit;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SlowLogQueryFilter that = (SlowLogQueryFilter) o;
+
+    return new EqualsBuilder()
+      .append(limit, that.limit)
+      .append(regionName, that.regionName)
+      .append(clientAddress, that.clientAddress)
+      .append(tableName, that.tableName)
+      .append(userName, that.userName)
+      .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(regionName)
+      .append(clientAddress)
+      .append(tableName)
+      .append(userName)
+      .append(limit)
+      .toHashCode();
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("regionName", regionName)
+      .append("clientAddress", clientAddress)
+      .append("tableName", tableName)
+      .append("userName", userName)
+      .append("limit", limit)
+      .toString();
+  }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java
new file mode 100644
index 0000000..9593618
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SlowLogRecord.java
@@ -0,0 +1,319 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.util.GsonUtil;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.gson.Gson;
+import org.apache.hbase.thirdparty.com.google.gson.JsonObject;
+import org.apache.hbase.thirdparty.com.google.gson.JsonSerializer;
+
+/**
+ * SlowLog payload for hbase-client, to be used by Admin API get_slow_responses
+ */
+@InterfaceAudience.Private
+final public class SlowLogRecord {
+
+  // used to convert object to pretty printed format
+  // used by toJsonPrettyPrint()
+  private static final Gson GSON = GsonUtil.createGson()
+    .setPrettyPrinting()
+    .registerTypeAdapter(SlowLogRecord.class, (JsonSerializer<SlowLogRecord>)
+      (slowLogPayload, type, jsonSerializationContext) -> {
+        Gson gson = new Gson();
+        JsonObject jsonObj = (JsonObject) gson.toJsonTree(slowLogPayload);
+        if (slowLogPayload.getMultiGetsCount() == 0) {
+          jsonObj.remove("multiGetsCount");
+        }
+        if (slowLogPayload.getMultiMutationsCount() == 0) {
+          jsonObj.remove("multiMutationsCount");
+        }
+        if (slowLogPayload.getMultiServiceCalls() == 0) {
+          jsonObj.remove("multiServiceCalls");
+        }
+        return jsonObj;
+      }).create();
+
+  private long startTime;
+  private int processingTime;
+  private int queueTime;
+  private long responseSize;
+  private String clientAddress;
+  private String serverClass;
+  private String methodName;
+  private String callDetails;
+  private String param;
+  // we don't want to serialize region name, it is just for the filter purpose
+  // hence avoiding deserialization
+  private transient String regionName;
+  private String userName;
+  private int multiGetsCount;
+  private int multiMutationsCount;
+  private int multiServiceCalls;
+
+  public long getStartTime() {
+    return startTime;
+  }
+
+  public int getProcessingTime() {
+    return processingTime;
+  }
+
+  public int getQueueTime() {
+    return queueTime;
+  }
+
+  public long getResponseSize() {
+    return responseSize;
+  }
+
+  public String getClientAddress() {
+    return clientAddress;
+  }
+
+  public String getServerClass() {
+    return serverClass;
+  }
+
+  public String getMethodName() {
+    return methodName;
+  }
+
+  public String getCallDetails() {
+    return callDetails;
+  }
+
+  public String getParam() {
+    return param;
+  }
+
+  public String getRegionName() {
+    return regionName;
+  }
+
+  public String getUserName() {
+    return userName;
+  }
+
+  public int getMultiGetsCount() {
+    return multiGetsCount;
+  }
+
+  public int getMultiMutationsCount() {
+    return multiMutationsCount;
+  }
+
+  public int getMultiServiceCalls() {
+    return multiServiceCalls;
+  }
+
+  private SlowLogRecord(final long startTime, final int processingTime, final int queueTime,
+      final long responseSize, final String clientAddress, final String serverClass,
+      final String methodName, final String callDetails, final String param,
+      final String regionName, final String userName, final int multiGetsCount,
+      final int multiMutationsCount, final int multiServiceCalls) {
+    this.startTime = startTime;
+    this.processingTime = processingTime;
+    this.queueTime = queueTime;
+    this.responseSize = responseSize;
+    this.clientAddress = clientAddress;
+    this.serverClass = serverClass;
+    this.methodName = methodName;
+    this.callDetails = callDetails;
+    this.param = param;
+    this.regionName = regionName;
+    this.userName = userName;
+    this.multiGetsCount = multiGetsCount;
+    this.multiMutationsCount = multiMutationsCount;
+    this.multiServiceCalls = multiServiceCalls;
+  }
+
+  public static class SlowLogRecordBuilder {
+    private long startTime;
+    private int processingTime;
+    private int queueTime;
+    private long responseSize;
+    private String clientAddress;
+    private String serverClass;
+    private String methodName;
+    private String callDetails;
+    private String param;
+    private String regionName;
+    private String userName;
+    private int multiGetsCount;
+    private int multiMutationsCount;
+    private int multiServiceCalls;
+
+    public SlowLogRecordBuilder setStartTime(long startTime) {
+      this.startTime = startTime;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setProcessingTime(int processingTime) {
+      this.processingTime = processingTime;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setQueueTime(int queueTime) {
+      this.queueTime = queueTime;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setResponseSize(long responseSize) {
+      this.responseSize = responseSize;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setClientAddress(String clientAddress) {
+      this.clientAddress = clientAddress;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setServerClass(String serverClass) {
+      this.serverClass = serverClass;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setMethodName(String methodName) {
+      this.methodName = methodName;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setCallDetails(String callDetails) {
+      this.callDetails = callDetails;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setParam(String param) {
+      this.param = param;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setRegionName(String regionName) {
+      this.regionName = regionName;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setUserName(String userName) {
+      this.userName = userName;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setMultiGetsCount(int multiGetsCount) {
+      this.multiGetsCount = multiGetsCount;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setMultiMutationsCount(int multiMutationsCount) {
+      this.multiMutationsCount = multiMutationsCount;
+      return this;
+    }
+
+    public SlowLogRecordBuilder setMultiServiceCalls(int multiServiceCalls) {
+      this.multiServiceCalls = multiServiceCalls;
+      return this;
+    }
+
+    public SlowLogRecord build() {
+      return new SlowLogRecord(startTime, processingTime, queueTime, responseSize,
+        clientAddress, serverClass, methodName, callDetails, param, regionName,
+        userName, multiGetsCount, multiMutationsCount, multiServiceCalls);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    SlowLogRecord that = (SlowLogRecord) o;
+
+    return new EqualsBuilder()
+      .append(startTime, that.startTime)
+      .append(processingTime, that.processingTime)
+      .append(queueTime, that.queueTime)
+      .append(responseSize, that.responseSize)
+      .append(multiGetsCount, that.multiGetsCount)
+      .append(multiMutationsCount, that.multiMutationsCount)
+      .append(multiServiceCalls, that.multiServiceCalls)
+      .append(clientAddress, that.clientAddress)
+      .append(serverClass, that.serverClass)
+      .append(methodName, that.methodName)
+      .append(callDetails, that.callDetails)
+      .append(param, that.param)
+      .append(regionName, that.regionName)
+      .append(userName, that.userName)
+      .isEquals();
+  }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder(17, 37)
+      .append(startTime)
+      .append(processingTime)
+      .append(queueTime)
+      .append(responseSize)
+      .append(clientAddress)
+      .append(serverClass)
+      .append(methodName)
+      .append(callDetails)
+      .append(param)
+      .append(regionName)
+      .append(userName)
+      .append(multiGetsCount)
+      .append(multiMutationsCount)
+      .append(multiServiceCalls)
+      .toHashCode();
+  }
+
+  public String toJsonPrettyPrint() {
+    return GSON.toJson(this);
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("startTime", startTime)
+      .append("processingTime", processingTime)
+      .append("queueTime", queueTime)
+      .append("responseSize", responseSize)
+      .append("clientAddress", clientAddress)
+      .append("serverClass", serverClass)
+      .append("methodName", methodName)
+      .append("callDetails", callDetails)
+      .append("param", param)
+      .append("regionName", regionName)
+      .append("userName", userName)
+      .append("multiGetsCount", multiGetsCount)
+      .append("multiMutationsCount", multiMutationsCount)
+      .append("multiServiceCalls", multiServiceCalls)
+      .toString();
+  }
+
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
index 46cef7b..dc4091b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/ProtobufUtil.java
@@ -84,6 +84,8 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.RegionStatesCount;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.client.SlowLogRecord;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -128,7 +130,10 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
@@ -145,12 +150,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegio
 import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.Column;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.ColumnValue.QualifierValue;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.DeleteType;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.RegionAction;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
@@ -182,6 +191,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
@@ -2222,6 +2232,57 @@ public final class ProtobufUtil {
   }
 
   /**
+   * Return SlowLogParams to maintain recent online slowlog responses
+   *
+   * @param message Message object {@link Message}
+   * @return SlowLogParams with regionName(for filter queries) and params
+   */
+  public static SlowLogParams getSlowLogParams(Message message) {
+    if (message == null) {
+      return null;
+    }
+    if (message instanceof ScanRequest) {
+      ScanRequest scanRequest = (ScanRequest) message;
+      String regionName = getStringForByteString(scanRequest.getRegion().getValue());
+      String params = TextFormat.shortDebugString(message);
+      return new SlowLogParams(regionName, params);
+    } else if (message instanceof MutationProto) {
+      MutationProto mutationProto = (MutationProto) message;
+      String params = "type= " + mutationProto.getMutateType().toString();
+      return new SlowLogParams(params);
+    } else if (message instanceof GetRequest) {
+      GetRequest getRequest = (GetRequest) message;
+      String regionName = getStringForByteString(getRequest.getRegion().getValue());
+      String params = "region= " + regionName + ", row= "
+        + getStringForByteString(getRequest.getGet().getRow());
+      return new SlowLogParams(regionName, params);
+    } else if (message instanceof MultiRequest) {
+      MultiRequest multiRequest = (MultiRequest) message;
+      int actionsCount = multiRequest.getRegionActionList()
+        .stream()
+        .mapToInt(ClientProtos.RegionAction::getActionCount)
+        .sum();
+      RegionAction actions = multiRequest.getRegionActionList().get(0);
+      String regionName = getStringForByteString(actions.getRegion().getValue());
+      String params = "region= " + regionName + ", for " + actionsCount + " action(s)";
+      return new SlowLogParams(regionName, params);
+    } else if (message instanceof MutateRequest) {
+      MutateRequest mutateRequest = (MutateRequest) message;
+      String regionName = getStringForByteString(mutateRequest.getRegion().getValue());
+      String params = "region= " + regionName;
+      return new SlowLogParams(regionName, params);
+    } else if (message instanceof CoprocessorServiceRequest) {
+      CoprocessorServiceRequest coprocessorServiceRequest = (CoprocessorServiceRequest) message;
+      String params = "coprocessorService= "
+        + coprocessorServiceRequest.getCall().getServiceName()
+        + ":" + coprocessorServiceRequest.getCall().getMethodName();
+      return new SlowLogParams(params);
+    }
+    String params = message.getClass().toString();
+    return new SlowLogParams(params);
+  }
+
+  /**
    * Print out some subset of a MutationProto rather than all of it and its data
    * @param proto Protobuf to print out
    * @return Short String of mutation proto
@@ -3414,4 +3475,56 @@ public final class ProtobufUtil {
       .build();
   }
 
+  /**
+   * Convert Protobuf class
+   * {@link org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload}
+   * To client SlowLog Payload class {@link SlowLogRecord}
+   *
+   * @param slowLogPayload SlowLog Payload protobuf instance
+   * @return SlowLog Payload for client usecase
+   */
+  private static SlowLogRecord getSlowLogRecord(
+      final TooSlowLog.SlowLogPayload slowLogPayload) {
+    SlowLogRecord clientSlowLogRecord = new SlowLogRecord.SlowLogRecordBuilder()
+      .setCallDetails(slowLogPayload.getCallDetails())
+      .setClientAddress(slowLogPayload.getClientAddress())
+      .setMethodName(slowLogPayload.getMethodName())
+      .setMultiGetsCount(slowLogPayload.getMultiGets())
+      .setMultiMutationsCount(slowLogPayload.getMultiMutations())
+      .setMultiServiceCalls(slowLogPayload.getMultiServiceCalls())
+      .setParam(slowLogPayload.getParam())
+      .setProcessingTime(slowLogPayload.getProcessingTime())
+      .setQueueTime(slowLogPayload.getQueueTime())
+      .setRegionName(slowLogPayload.getRegionName())
+      .setResponseSize(slowLogPayload.getResponseSize())
+      .setServerClass(slowLogPayload.getServerClass())
+      .setStartTime(slowLogPayload.getStartTime())
+      .setUserName(slowLogPayload.getUserName())
+      .build();
+    return clientSlowLogRecord;
+  }
+
+  /**
+   * Convert  AdminProtos#SlowLogResponses to list of {@link SlowLogRecord}
+   *
+   * @param slowLogResponses slowlog response protobuf instance
+   * @return list of SlowLog payloads for client usecase
+   */
+  public static List<SlowLogRecord> toSlowLogPayloads(
+      final AdminProtos.SlowLogResponses slowLogResponses) {
+    List<SlowLogRecord> slowLogRecords = slowLogResponses.getSlowLogPayloadsList()
+      .stream().map(ProtobufUtil::getSlowLogRecord).collect(Collectors.toList());
+    return slowLogRecords;
+  }
+
+  /**
+   * Convert {@link ClearSlowLogResponses} to boolean
+   *
+   * @param clearSlowLogResponses Clear slowlog response protobuf instance
+   * @return boolean representing clear slowlog response
+   */
+  public static boolean toClearSlowLogPayload(final ClearSlowLogResponses clearSlowLogResponses) {
+    return clearSlowLogResponses.getIsCleaned();
+  }
+
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 24f8009..afdd653 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hbase.CellScannable;
 import org.apache.hadoop.hbase.ClusterMetrics.Option;
 import org.apache.hadoop.hbase.ClusterMetricsBuilder;
@@ -50,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Row;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.client.TableState;
 import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
@@ -67,6 +69,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
@@ -76,6 +79,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerIn
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest.RegionOpenInfo;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest.RegionUpdateInfo;
@@ -1940,4 +1944,44 @@ public final class RequestConverter {
     return IsSnapshotCleanupEnabledRequest.newBuilder().build();
   }
 
+  /**
+   * Create a protocol buffer {@link SlowLogResponseRequest}
+   *
+   * @param slowLogQueryFilter filter to use if provided
+   * @return a protocol buffer SlowLogResponseRequest
+   */
+  public static SlowLogResponseRequest buildSlowLogResponseRequest(
+      final SlowLogQueryFilter slowLogQueryFilter) {
+    SlowLogResponseRequest.Builder builder = SlowLogResponseRequest.newBuilder();
+    if (slowLogQueryFilter == null) {
+      return builder.build();
+    }
+    final String clientAddress = slowLogQueryFilter.getClientAddress();
+    if (StringUtils.isNotEmpty(clientAddress)) {
+      builder.setClientAddress(clientAddress);
+    }
+    final String regionName = slowLogQueryFilter.getRegionName();
+    if (StringUtils.isNotEmpty(regionName)) {
+      builder.setRegionName(regionName);
+    }
+    final String tableName = slowLogQueryFilter.getTableName();
+    if (StringUtils.isNotEmpty(tableName)) {
+      builder.setTableName(tableName);
+    }
+    final String userName = slowLogQueryFilter.getUserName();
+    if (StringUtils.isNotEmpty(userName)) {
+      builder.setUserName(userName);
+    }
+    return builder.setLimit(slowLogQueryFilter.getLimit()).build();
+  }
+
+  /**
+   * Create a protocol buffer {@link ClearSlowLogResponseRequest}
+   *
+   * @return a protocol buffer ClearSlowLogResponseRequest
+   */
+  public static ClearSlowLogResponseRequest buildClearSlowLogResponseRequest() {
+    return ClearSlowLogResponseRequest.newBuilder().build();
+  }
+
 }
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index d6b9667..6f1bb6a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1566,6 +1566,12 @@ public final class HConstants {
       "hbase.master.executor.logreplayops.threads";
   public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
 
+  public static final int DEFAULT_SLOW_LOG_RING_BUFFER_SIZE = 256;
+
+  public static final String SLOW_LOG_BUFFER_ENABLED_KEY =
+    "hbase.regionserver.slowlog.buffer.enabled";
+  public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
+
   private HConstants() {
     // Can't be instantiated with this ctor.
   }
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 8fda74d..a7d6898 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1926,4 +1926,27 @@ possible configurations would overwhelm and obscure the important.
       enable this feature.
     </description>
   </property>
+  <property>
+    <name>hbase.regionserver.slowlog.ringbuffer.size</name>
+    <value>256</value>
+    <description>
+      Default size of ringbuffer to be maintained by each RegionServer in order
+      to store online slowlog responses. This is an in-memory ring buffer of
+      requests that were judged to be too slow in addition to the responseTooSlow
+      logging. The in-memory representation would be complete.
+      For more details, please look into Doc Section:
+      Get Slow Response Log from shell
+    </description>
+  </property>
+  <property>
+    <name>hbase.regionserver.slowlog.buffer.enabled</name>
+    <value>false</value>
+    <description>
+      Indicates whether RegionServers have ring buffer running for storing
+      Online Slow logs in FIFO manner with limited entries. The size of
+      the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
+      The default value is false, turn this on and get latest slowlog
+      responses with complete data.
+    </description>
+  </property>
 </configuration>
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index 85b9113..34c9806 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -29,6 +29,7 @@ import "ClusterStatus.proto";
 import "HBase.proto";
 import "WAL.proto";
 import "Quota.proto";
+import "TooSlowLog.proto";
 
 message GetRegionInfoRequest {
   required RegionSpecifier region = 1;
@@ -280,6 +281,26 @@ message ExecuteProceduresRequest {
 message ExecuteProceduresResponse {
 }
 
+message SlowLogResponseRequest {
+  optional string region_name = 1;
+  optional string table_name = 2;
+  optional string client_address = 3;
+  optional string user_name = 4;
+  optional uint32 limit = 5 [default = 10];
+}
+
+message SlowLogResponses {
+  repeated SlowLogPayload slow_log_payloads = 1;
+}
+
+message ClearSlowLogResponseRequest {
+
+}
+
+message ClearSlowLogResponses {
+  required bool is_cleaned = 1;
+}
+
 service AdminService {
   rpc GetRegionInfo(GetRegionInfoRequest)
     returns(GetRegionInfoResponse);
@@ -344,4 +365,10 @@ service AdminService {
 
   rpc ExecuteProcedures(ExecuteProceduresRequest)
     returns(ExecuteProceduresResponse);
+
+  rpc GetSlowLogResponses(SlowLogResponseRequest)
+    returns(SlowLogResponses);
+
+  rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
+    returns(ClearSlowLogResponses);
 }
diff --git a/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto
new file mode 100644
index 0000000..26dabde
--- /dev/null
+++ b/hbase-protocol-shaded/src/main/protobuf/TooSlowLog.proto
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto2";
+
+// This file contains protocol buffers that are used for Online TooSlowLogs
+// To be used as Ring Buffer payload
+package hbase.pb;
+
+option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
+option java_outer_classname = "TooSlowLog";
+option java_generate_equals_and_hash = true;
+option optimize_for = SPEED;
+
+message SlowLogPayload {
+  required int64 start_time = 1;
+  required int32 processing_time = 2;
+  required int32 queue_time = 3;
+  required int64 response_size = 4;
+  required string client_address = 5;
+  required string server_class = 6;
+  required string method_name = 7;
+  required string call_details = 8;
+  optional string param = 9;
+  required string user_name = 10;
+  optional string region_name = 11;
+  optional int32 multi_gets = 12 [default = 0];
+  optional int32 multi_mutations = 13 [default = 0];
+  optional int32 multi_service_calls = 14 [default = 0];
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index f0409fd..97b4990 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -33,6 +33,8 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.atomic.LongAdder;
+
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CallQueueTooBigException;
 import org.apache.hadoop.hbase.CellScanner;
@@ -44,6 +46,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
 import org.apache.hadoop.hbase.security.SaslUtil;
 import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
 import org.apache.hadoop.hbase.security.User;
@@ -85,6 +89,10 @@ public abstract class RpcServer implements RpcServerInterface,
   protected static final CallQueueTooBigException CALL_QUEUE_TOO_BIG_EXCEPTION
       = new CallQueueTooBigException();
 
+  private static final String MULTI_GETS = "multi.gets";
+  private static final String MULTI_MUTATIONS = "multi.mutations";
+  private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
+
   private final boolean authorize;
   protected boolean isSecurityEnabled;
 
@@ -215,6 +223,12 @@ public abstract class RpcServer implements RpcServerInterface,
    */
   private RSRpcServices rsRpcServices;
 
+
+  /**
+   * Use to add online slowlog responses
+   */
+  private SlowLogRecorder slowLogRecorder;
+
   @FunctionalInterface
   protected interface CallCleanup {
     void run();
@@ -403,13 +417,21 @@ public abstract class RpcServer implements RpcServerInterface,
       boolean tooSlow = (processingTime > warnResponseTime && warnResponseTime > -1);
       boolean tooLarge = (responseSize > warnResponseSize && warnResponseSize > -1);
       if (tooSlow || tooLarge) {
+        final String userName = call.getRequestUserName().orElse(StringUtils.EMPTY);
         // when tagging, we let TooLarge trump TooSmall to keep output simple
         // note that large responses will often also be slow.
         logResponse(param,
-            md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
-            (tooLarge ? "TooLarge" : "TooSlow"),
-            status.getClient(), startTime, processingTime, qTime,
-            responseSize);
+          md.getName(), md.getName() + "(" + param.getClass().getName() + ")",
+          tooLarge, tooSlow,
+          status.getClient(), startTime, processingTime, qTime,
+          responseSize, userName);
+        if (tooSlow && this.slowLogRecorder != null) {
+          // send logs to ring buffer owned by slowLogRecorder
+          final String className = server == null ? StringUtils.EMPTY :
+            server.getClass().getSimpleName();
+          this.slowLogRecorder.addSlowLogPayload(
+            new RpcLogDetails(call, status.getClient(), responseSize, className));
+        }
       }
       return new Pair<>(result, controller.cellScanner());
     } catch (Throwable e) {
@@ -440,17 +462,21 @@ public abstract class RpcServer implements RpcServerInterface,
    * @param param The parameters received in the call.
    * @param methodName The name of the method invoked
    * @param call The string representation of the call
-   * @param tag  The tag that will be used to indicate this event in the log.
-   * @param clientAddress   The address of the client who made this call.
-   * @param startTime       The time that the call was initiated, in ms.
-   * @param processingTime  The duration that the call took to run, in ms.
-   * @param qTime           The duration that the call spent on the queue
-   *                        prior to being initiated, in ms.
-   * @param responseSize    The size in bytes of the response buffer.
+   * @param tooLarge To indicate if the event is tooLarge
+   * @param tooSlow To indicate if the event is tooSlow
+   * @param clientAddress The address of the client who made this call.
+   * @param startTime The time that the call was initiated, in ms.
+   * @param processingTime The duration that the call took to run, in ms.
+   * @param qTime The duration that the call spent on the queue
+   *   prior to being initiated, in ms.
+   * @param responseSize The size in bytes of the response buffer.
+   * @param userName UserName of the current RPC Call
    */
-  void logResponse(Message param, String methodName, String call, String tag,
-      String clientAddress, long startTime, int processingTime, int qTime,
-      long responseSize) throws IOException {
+  void logResponse(Message param, String methodName, String call, boolean tooLarge,
+      boolean tooSlow, String clientAddress, long startTime, int processingTime, int qTime,
+      long responseSize, String userName) {
+    final String className = server == null ? StringUtils.EMPTY :
+      server.getClass().getSimpleName();
     // base information that is reported regardless of type of call
     Map<String, Object> responseInfo = new HashMap<>();
     responseInfo.put("starttimems", startTime);
@@ -458,7 +484,7 @@ public abstract class RpcServer implements RpcServerInterface,
     responseInfo.put("queuetimems", qTime);
     responseInfo.put("responsesize", responseSize);
     responseInfo.put("client", clientAddress);
-    responseInfo.put("class", server == null? "": server.getClass().getSimpleName());
+    responseInfo.put("class", className);
     responseInfo.put("method", methodName);
     responseInfo.put("call", call);
     // The params could be really big, make sure they don't kill us at WARN
@@ -496,13 +522,16 @@ public abstract class RpcServer implements RpcServerInterface,
           }
         }
       }
-      responseInfo.put("multi.gets", numGets);
-      responseInfo.put("multi.mutations", numMutations);
-      responseInfo.put("multi.servicecalls", numServiceCalls);
+      responseInfo.put(MULTI_GETS, numGets);
+      responseInfo.put(MULTI_MUTATIONS, numMutations);
+      responseInfo.put(MULTI_SERVICE_CALLS, numServiceCalls);
     }
+    final String tag = (tooLarge && tooSlow) ? "TooLarge & TooSlow"
+      : (tooSlow ? "TooSlow" : "TooLarge");
     LOG.warn("(response" + tag + "): " + GSON.toJson(responseInfo));
   }
 
+
   /**
    * Truncate to number of chars decided by conf hbase.ipc.trace.log.max.length
    * if TRACE is on else to 150 chars Refer to Jira HBASE-20826 and HBASE-20942
@@ -758,4 +787,14 @@ public abstract class RpcServer implements RpcServerInterface,
   public void setRsRpcServices(RSRpcServices rsRpcServices) {
     this.rsRpcServices = rsRpcServices;
   }
+
+  @Override
+  public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
+    this.slowLogRecorder = slowLogRecorder;
+  }
+
+  @Override
+  public SlowLogRecorder getSlowLogRecorder() {
+    return slowLogRecorder;
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 0f875d8..3155679 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -22,13 +22,14 @@ package org.apache.hadoop.hbase.ipc;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 
-import org.apache.hadoop.hbase.io.ByteBuffAllocator;
-import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.security.authorize.PolicyProvider;
+import org.apache.yetus.audience.InterfaceAudience;
 
 import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
@@ -96,4 +97,16 @@ public interface RpcServerInterface {
   ByteBuffAllocator getByteBuffAllocator();
 
   void setRsRpcServices(RSRpcServices rsRpcServices);
+
+  /**
+   * Set Online SlowLog Provider
+   *
+   * @param slowLogRecorder instance of {@link SlowLogRecorder}
+   */
+  void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
+
+  /**
+   * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
+   */
+  SlowLogRecorder getSlowLogRecorder();
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index df81413..75122c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
 import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
 import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
 import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -528,6 +529,11 @@ public class HRegionServer extends HasThread implements
   private final NettyEventLoopGroupConfig eventLoopGroupConfig;
 
   /**
+   * Provide online slow log responses from ringbuffer
+   */
+  private SlowLogRecorder slowLogRecorder;
+
+  /**
    * True if this RegionServer is coming up in a cluster where there is no Master;
    * means it needs to just come up and make do without a Master to talk to: e.g. in test or
    * HRegionServer is doing other than its usual duties: e.g. as an hollowed-out host whose only
@@ -586,6 +592,9 @@ public class HRegionServer extends HasThread implements
       this.abortRequested = false;
       this.stopped = false;
 
+      if (!(this instanceof HMaster)) {
+        this.slowLogRecorder = new SlowLogRecorder(this.conf);
+      }
       rpcServices = createRpcServices();
       useThisHostnameInstead = getUseThisHostnameInstead(conf);
       String hostName =
@@ -1494,6 +1503,15 @@ public class HRegionServer extends HasThread implements
   }
 
   /**
+   * get Online SlowLog Provider to add slow logs to ringbuffer
+   *
+   * @return Online SlowLog Provider
+   */
+  public SlowLogRecorder getSlowLogRecorder() {
+    return this.slowLogRecorder;
+  }
+
+  /*
    * Run init. Sets up wal and starts up all server threads.
    *
    * @param c Extra configuration.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b90c6fc..ed4ead0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -104,6 +105,7 @@ import org.apache.hadoop.hbase.ipc.RpcServerInterface;
 import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.master.MasterRpcServices;
 import org.apache.hadoop.hbase.net.Address;
 import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -125,6 +127,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
 import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -167,6 +170,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -196,6 +201,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@@ -243,6 +250,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuo
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.FlushDescriptor;
@@ -1245,6 +1253,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG);
     rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
     rpcServer.setRsRpcServices(this);
+    if (!(rs instanceof HMaster)) {
+      rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
+    }
     scannerLeaseTimeoutPeriod = conf.getInt(
       HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
       HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD);
@@ -3730,6 +3741,36 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     }
   }
 
+  @Override
+  @QosPriority(priority = HConstants.ADMIN_QOS)
+  public SlowLogResponses getSlowLogResponses(final RpcController controller,
+    final SlowLogResponseRequest request) {
+    final SlowLogRecorder slowLogRecorder =
+      this.regionServer.getSlowLogRecorder();
+    final List<SlowLogPayload> slowLogPayloads;
+    slowLogPayloads = slowLogRecorder != null
+      ? slowLogRecorder.getSlowLogPayloads(request)
+      : Collections.emptyList();
+    SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
+      .addAllSlowLogPayloads(slowLogPayloads)
+      .build();
+    return slowLogResponses;
+  }
+
+  @Override
+  @QosPriority(priority = HConstants.ADMIN_QOS)
+  public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
+    final ClearSlowLogResponseRequest request) {
+    final SlowLogRecorder slowLogRecorder =
+      this.regionServer.getSlowLogRecorder();
+    boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
+      .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
+    ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
+      .setIsCleaned(slowLogsCleaned)
+      .build();
+    return clearSlowLogResponses;
+  }
+
   @VisibleForTesting
   public RpcScheduler getRpcScheduler() {
     return rpcServer.getScheduler();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
new file mode 100644
index 0000000..53a2ef1
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
@@ -0,0 +1,50 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.ExceptionHandler;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Exception Handler for Online Slow Log Ring Buffer
+ */
+@InterfaceAudience.Private
+class DisruptorExceptionHandler implements ExceptionHandler<RingBufferEnvelope> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
+
+  @Override
+  public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) {
+    LOG.error("Sequence={}, event={}", sequence, event, e);
+  }
+
+  @Override
+  public void handleOnStartException(Throwable e) {
+    LOG.error("Disruptor onStartException: ", e);
+  }
+
+  @Override
+  public void handleOnShutdownException(Throwable e) {
+    LOG.error("Disruptor onShutdownException: ", e);
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
new file mode 100644
index 0000000..d308670
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+
+
+/**
+ * An envelope to carry payload in the slow log ring buffer that serves as online buffer
+ * to provide latest TooSlowLog
+ */
+@InterfaceAudience.Private
+final class RingBufferEnvelope {
+
+  private RpcLogDetails rpcLogDetails;
+
+  /**
+   * Load the Envelope with {@link RpcCall}
+   *
+   * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
+   *   consumers
+   */
+  public void load(RpcLogDetails rpcLogDetails) {
+    this.rpcLogDetails = rpcLogDetails;
+  }
+
+  /**
+   * Retrieve current rpcCall details {@link RpcLogDetails} available on Envelope and
+   * free up the Envelope
+   *
+   * @return Retrieve rpc log details
+   */
+  public RpcLogDetails getPayload() {
+    final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
+    this.rpcLogDetails = null;
+    return rpcLogDetails;
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
new file mode 100644
index 0000000..e7ab7d4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
@@ -0,0 +1,71 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * RpcCall details that would be passed on to ring buffer of slow log responses
+ */
+@InterfaceAudience.Private
+public class RpcLogDetails {
+
+  private RpcCall rpcCall;
+  private String clientAddress;
+  private long responseSize;
+  private String className;
+
+  public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize,
+      String className) {
+    this.rpcCall = rpcCall;
+    this.clientAddress = clientAddress;
+    this.responseSize = responseSize;
+    this.className = className;
+  }
+
+  public RpcCall getRpcCall() {
+    return rpcCall;
+  }
+
+  public String getClientAddress() {
+    return clientAddress;
+  }
+
+  public long getResponseSize() {
+    return responseSize;
+  }
+
+  public String getClassName() {
+    return className;
+  }
+
+  @Override
+  public String toString() {
+    return new ToStringBuilder(this)
+      .append("rpcCall", rpcCall)
+      .append("clientAddress", clientAddress)
+      .append("responseSize", responseSize)
+      .append("className", className)
+      .toString();
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java
new file mode 100644
index 0000000..24e8460
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogEventHandler.java
@@ -0,0 +1,208 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Event Handler run by disruptor ringbuffer consumer
+ */
+@InterfaceAudience.Private
+class SlowLogEventHandler implements EventHandler<RingBufferEnvelope> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(SlowLogEventHandler.class);
+
+  private final Queue<SlowLogPayload> queue;
+
+  SlowLogEventHandler(int eventCount) {
+    EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
+    queue = Queues.synchronizedQueue(evictingQueue);
+  }
+
+  /**
+   * Called when a publisher has published an event to the {@link RingBuffer}
+   *
+   * @param event published to the {@link RingBuffer}
+   * @param sequence of the event being processed
+   * @param endOfBatch flag to indicate if this is the last event in a batch from
+   *   the {@link RingBuffer}
+   * @throws Exception if the EventHandler would like the exception handled further up the chain
+   */
+  @Override
+  public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
+      throws Exception {
+    final RpcLogDetails rpcCallDetails = event.getPayload();
+    final RpcCall rpcCall = rpcCallDetails.getRpcCall();
+    final String clientAddress = rpcCallDetails.getClientAddress();
+    final long responseSize = rpcCallDetails.getResponseSize();
+    final String className = rpcCallDetails.getClassName();
+    Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
+    Message param = rpcCall.getParam();
+    long receiveTime = rpcCall.getReceiveTime();
+    long startTime = rpcCall.getStartTime();
+    long endTime = System.currentTimeMillis();
+    int processingTime = (int) (endTime - startTime);
+    int qTime = (int) (startTime - receiveTime);
+    final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
+    int numGets = 0;
+    int numMutations = 0;
+    int numServiceCalls = 0;
+    if (param instanceof ClientProtos.MultiRequest) {
+      ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
+      for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
+        for (ClientProtos.Action action : regionAction.getActionList()) {
+          if (action.hasMutation()) {
+            numMutations++;
+          }
+          if (action.hasGet()) {
+            numGets++;
+          }
+          if (action.hasServiceCall()) {
+            numServiceCalls++;
+          }
+        }
+      }
+    }
+    final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
+    final String methodDescriptorName =
+      methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
+    SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
+      .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
+      .setClientAddress(clientAddress)
+      .setMethodName(methodDescriptorName)
+      .setMultiGets(numGets)
+      .setMultiMutations(numMutations)
+      .setMultiServiceCalls(numServiceCalls)
+      .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
+      .setProcessingTime(processingTime)
+      .setQueueTime(qTime)
+      .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
+      .setResponseSize(responseSize)
+      .setServerClass(className)
+      .setStartTime(startTime)
+      .setUserName(userName)
+      .build();
+    queue.add(slowLogPayload);
+  }
+
+  /**
+   * Cleans up slow log payloads
+   *
+   * @return true if slow log payloads are cleaned up, false otherwise
+   */
+  boolean clearSlowLogs() {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received request to clean up online slowlog buffer..");
+    }
+    queue.clear();
+    return true;
+  }
+
+  /**
+   * Retrieve list of slow log payloads
+   *
+   * @param request slow log request parameters
+   * @return list of slow log payloads
+   */
+  List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
+    List<SlowLogPayload> slowLogPayloadList =
+      Arrays.stream(queue.toArray(new SlowLogPayload[0])).collect(Collectors.toList());
+
+    // latest slow logs first, operator is interested in latest records from in-memory buffer
+    Collections.reverse(slowLogPayloadList);
+
+    if (isFilterProvided(request)) {
+      slowLogPayloadList = filterSlowLogs(request, slowLogPayloadList);
+    }
+    int limit = request.getLimit() >= slowLogPayloadList.size() ? slowLogPayloadList.size()
+      : request.getLimit();
+    return slowLogPayloadList.subList(0, limit);
+  }
+
+  private boolean isFilterProvided(AdminProtos.SlowLogResponseRequest request) {
+    if (StringUtils.isNotEmpty(request.getUserName())) {
+      return true;
+    }
+    if (StringUtils.isNotEmpty(request.getTableName())) {
+      return true;
+    }
+    if (StringUtils.isNotEmpty(request.getClientAddress())) {
+      return true;
+    }
+    return StringUtils.isNotEmpty(request.getRegionName());
+  }
+
+  private List<SlowLogPayload> filterSlowLogs(AdminProtos.SlowLogResponseRequest request,
+      List<SlowLogPayload> slowLogPayloadList) {
+    List<SlowLogPayload> filteredSlowLogPayloads = new ArrayList<>();
+    for (SlowLogPayload slowLogPayload : slowLogPayloadList) {
+      if (StringUtils.isNotEmpty(request.getRegionName())) {
+        if (slowLogPayload.getRegionName().equals(request.getRegionName())) {
+          filteredSlowLogPayloads.add(slowLogPayload);
+          continue;
+        }
+      }
+      if (StringUtils.isNotEmpty(request.getTableName())) {
+        if (slowLogPayload.getRegionName().startsWith(request.getTableName())) {
+          filteredSlowLogPayloads.add(slowLogPayload);
+          continue;
+        }
+      }
+      if (StringUtils.isNotEmpty(request.getClientAddress())) {
+        if (slowLogPayload.getClientAddress().equals(request.getClientAddress())) {
+          filteredSlowLogPayloads.add(slowLogPayload);
+          continue;
+        }
+      }
+      if (StringUtils.isNotEmpty(request.getUserName())) {
+        if (slowLogPayload.getUserName().equals(request.getUserName())) {
+          filteredSlowLogPayloads.add(slowLogPayload);
+        }
+      }
+    }
+    return filteredSlowLogPayloads;
+  }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
new file mode 100644
index 0000000..d750642
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
@@ -0,0 +1,153 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import com.lmax.disruptor.BlockingWaitStrategy;
+import com.lmax.disruptor.RingBuffer;
+import com.lmax.disruptor.dsl.Disruptor;
+import com.lmax.disruptor.dsl.ProducerType;
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Online SlowLog Provider Service that keeps slow RPC logs in the ring buffer.
+ * The service uses LMAX Disruptor to save slow records which are then consumed by
+ * a queue and based on the ring buffer size, the available records are then fetched
+ * from the queue in thread-safe manner.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class SlowLogRecorder {
+
+  private final Disruptor<RingBufferEnvelope> disruptor;
+  private final SlowLogEventHandler slowLogEventHandler;
+  private final int eventCount;
+  private final boolean isOnlineSlowLogProviderEnabled;
+
+  private static final String SLOW_LOG_RING_BUFFER_SIZE =
+    "hbase.regionserver.slowlog.ringbuffer.size";
+
+  /**
+   * Initialize disruptor with configurable ringbuffer size
+   */
+  public SlowLogRecorder(Configuration conf) {
+    isOnlineSlowLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+      HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+
+    if (!isOnlineSlowLogProviderEnabled) {
+      this.disruptor = null;
+      this.slowLogEventHandler = null;
+      this.eventCount = 0;
+      return;
+    }
+
+    this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
+      HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+
+    // This is the 'writer' -- a single threaded executor. This single thread consumes what is
+    // put on the ringbuffer.
+    final String hostingThreadName = Thread.currentThread().getName();
+
+    // disruptor initialization with BlockingWaitStrategy
+    this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
+      getEventCount(),
+      Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
+      ProducerType.MULTI,
+      new BlockingWaitStrategy());
+    this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
+
+    // initialize ringbuffer event handler
+    this.slowLogEventHandler = new SlowLogEventHandler(this.eventCount);
+    this.disruptor.handleEventsWith(new SlowLogEventHandler[]{this.slowLogEventHandler});
+    this.disruptor.start();
+  }
+
+  // must be power of 2 for disruptor ringbuffer
+  private int getEventCount() {
+    Preconditions.checkArgument(eventCount >= 0,
+      SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
+    int floor = Integer.highestOneBit(eventCount);
+    if (floor == eventCount) {
+      return floor;
+    }
+    // max capacity is 1 << 30
+    if (floor >= 1 << 29) {
+      return 1 << 30;
+    }
+    return floor << 1;
+  }
+
+  /**
+   * Retrieve online slow logs from ringbuffer
+   *
+   * @param request slow log request parameters
+   * @return online slow logs from ringbuffer
+   */
+  public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
+    return isOnlineSlowLogProviderEnabled ? this.slowLogEventHandler.getSlowLogPayloads(request)
+      : Collections.emptyList();
+  }
+
+  /**
+   * clears slow log payloads from ringbuffer
+   *
+   * @return true if slow log payloads are cleaned up or
+   *   hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
+   *   clean up slow logs
+   */
+  public boolean clearSlowLogPayloads() {
+    if (!isOnlineSlowLogProviderEnabled) {
+      return true;
+    }
+    return this.slowLogEventHandler.clearSlowLogs();
+  }
+
+  /**
+   * Add slow log rpcCall details to ringbuffer
+   *
+   * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
+   *   consumers
+   */
+  public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
+    if (!isOnlineSlowLogProviderEnabled) {
+      return;
+    }
+    RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
+    long seqId = ringBuffer.next();
+    try {
+      ringBuffer.get(seqId).load(rpcLogDetails);
+    } finally {
+      ringBuffer.publish(seqId);
+    }
+  }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
index 5beef53..89329c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdmin2.java
@@ -24,8 +24,10 @@ import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -868,4 +870,29 @@ public class TestAdmin2 extends TestAdminBase {
     assertEquals(initialState, ADMIN.isSnapshotCleanupEnabled());
   }
 
+  @Test
+  public void testSlowLogResponses() throws Exception {
+    // get all live server names
+    Collection<ServerName> serverNames = ADMIN.getRegionServers();
+    List<ServerName> serverNameList = new ArrayList<>(serverNames);
+
+    // clean up slowlog responses maintained in memory by RegionServers
+    List<Boolean> areSlowLogsCleared = ADMIN.clearSlowLogResponses(new HashSet<>(serverNameList));
+
+    int countFailedClearSlowResponse = 0;
+    for (Boolean isSlowLogCleared : areSlowLogsCleared) {
+      if (!isSlowLogCleared) {
+        ++countFailedClearSlowResponse;
+      }
+    }
+    Assert.assertEquals(countFailedClearSlowResponse, 0);
+
+    SlowLogQueryFilter slowLogQueryFilter = new SlowLogQueryFilter();
+    List<SlowLogRecord> slowLogRecords = ADMIN.getSlowLogResponses(new HashSet<>(serverNames),
+      slowLogQueryFilter);
+
+    // after cleanup of slowlog responses, total count of slowlog payloads should be 0
+    Assert.assertEquals(slowLogRecords.size(), 0);
+  }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
index 3737101..c379775 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAdminBase.java
@@ -46,6 +46,7 @@ public class TestAdminBase {
     TEST_UTIL.getConfiguration().setBoolean("hbase.master.enabletable.roundrobin", true);
     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT, 30);
     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 30);
+    TEST_UTIL.getConfiguration().setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
     TEST_UTIL.startMiniCluster(3);
     ADMIN = TEST_UTIL.getAdmin();
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 2797df3..f555f4b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -34,6 +34,7 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.List;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -51,9 +52,11 @@ import org.junit.Assume;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoResponseProto;
@@ -524,4 +527,5 @@ public abstract class AbstractTestIPC {
       rpcServer.stop();
     }
   }
+
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 604797a..dbd1d9d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompac
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
@@ -109,6 +111,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWA
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
@@ -678,6 +682,18 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
+  public SlowLogResponses getSlowLogResponses(RpcController controller,
+      SlowLogResponseRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
+  public ClearSlowLogResponses clearSlowLogsResponses(RpcController controller,
+      ClearSlowLogResponseRequest request) throws ServiceException {
+    return null;
+  }
+
+  @Override
   public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
       RpcController controller, GetSpaceQuotaSnapshotsRequest request)
       throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
new file mode 100644
index 0000000..240230e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
@@ -0,0 +1,593 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.security.PrivilegedAction;
+import java.security.PrivilegedExceptionAction;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hbase.thirdparty.com.google.protobuf.BlockingService;
+import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
+
+/**
+ * Tests for Online SlowLog Provider Service
+ */
+@Category({MasterTests.class, MediumTests.class})
+public class TestSlowLogRecorder {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
+
+  private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
+
+  private SlowLogRecorder slowLogRecorder;
+
+  private static int i = 0;
+
+  private static Configuration applySlowLogRecorderConf(int eventSize) {
+
+    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+    conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
+    return conf;
+
+  }
+
+  /**
+   * confirm that for a ringbuffer of slow logs, payload on given index of buffer
+   * has expected elements
+   *
+   * @param i index of ringbuffer logs
+   * @param j data value that was put on index i
+   * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
+   */
+  private void confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
+
+    Assert.assertEquals(slowLogPayloads.get(i).getClientAddress(), "client_" + j);
+    Assert.assertEquals(slowLogPayloads.get(i).getUserName(), "userName_" + j);
+    Assert.assertEquals(slowLogPayloads.get(i).getServerClass(), "class_" + j);
+  }
+
+  @Test
+  public void testOnlieSlowLogConsumption() throws Exception {
+
+    Configuration conf = applySlowLogRecorderConf(8);
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
+
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    int i = 0;
+
+    // add 5 records initially
+    for (; i < 5; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
+    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    confirmPayloadParams(0, 5, slowLogPayloads);
+    confirmPayloadParams(1, 4, slowLogPayloads);
+    confirmPayloadParams(2, 3, slowLogPayloads);
+    confirmPayloadParams(3, 2, slowLogPayloads);
+    confirmPayloadParams(4, 1, slowLogPayloads);
+
+    // add 2 more records
+    for (; i < 7; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
+
+    slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+
+    Assert.assertEquals(slowLogPayloads.size(), 7);
+    confirmPayloadParams(0, 7, slowLogPayloads);
+    confirmPayloadParams(5, 2, slowLogPayloads);
+    confirmPayloadParams(6, 1, slowLogPayloads);
+
+    // add 3 more records
+    for (; i < 10; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+
+    slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    // confirm ringbuffer is full
+    Assert.assertEquals(slowLogPayloads.size(), 8);
+    confirmPayloadParams(7, 3, slowLogPayloads);
+    confirmPayloadParams(0, 10, slowLogPayloads);
+    confirmPayloadParams(1, 9, slowLogPayloads);
+
+    // add 4 more records
+    for (; i < 14; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+
+    slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    // confirm ringbuffer is full
+    Assert.assertEquals(slowLogPayloads.size(), 8);
+    confirmPayloadParams(0, 14, slowLogPayloads);
+    confirmPayloadParams(1, 13, slowLogPayloads);
+    confirmPayloadParams(2, 12, slowLogPayloads);
+    confirmPayloadParams(3, 11, slowLogPayloads);
+
+    boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+    Assert.assertTrue(isRingBufferCleaned);
+
+    LOG.debug("cleared the ringbuffer of Online Slow Log records");
+
+    slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    // confirm ringbuffer is empty
+    Assert.assertEquals(slowLogPayloads.size(), 0);
+
+  }
+
+  @Test
+  public void testOnlineSlowLogWithHighRecords() throws Exception {
+
+    Configuration conf = applySlowLogRecorderConf(14);
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
+
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    for (int i = 0; i < 14 * 11; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+    LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+
+    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    Assert.assertEquals(slowLogPayloads.size(), 14);
+
+    // confirm strict order of slow log payloads
+    confirmPayloadParams(0, 154, slowLogPayloads);
+    confirmPayloadParams(1, 153, slowLogPayloads);
+    confirmPayloadParams(2, 152, slowLogPayloads);
+    confirmPayloadParams(3, 151, slowLogPayloads);
+    confirmPayloadParams(4, 150, slowLogPayloads);
+    confirmPayloadParams(5, 149, slowLogPayloads);
+    confirmPayloadParams(6, 148, slowLogPayloads);
+    confirmPayloadParams(7, 147, slowLogPayloads);
+    confirmPayloadParams(8, 146, slowLogPayloads);
+    confirmPayloadParams(9, 145, slowLogPayloads);
+    confirmPayloadParams(10, 144, slowLogPayloads);
+    confirmPayloadParams(11, 143, slowLogPayloads);
+    confirmPayloadParams(12, 142, slowLogPayloads);
+    confirmPayloadParams(13, 141, slowLogPayloads);
+
+    boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+    Assert.assertTrue(isRingBufferCleaned);
+    LOG.debug("cleared the ringbuffer of Online Slow Log records");
+    slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+
+    // confirm ringbuffer is empty
+    Assert.assertEquals(slowLogPayloads.size(), 0);
+
+  }
+
+  @Test
+  public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
+
+    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+    conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
+
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().build();
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    for (int i = 0; i < 300; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    Assert.assertEquals(slowLogPayloads.size(), 0);
+
+  }
+
+  @Test
+  public void testOnlineSlowLogWithDisableConfig() throws Exception {
+
+    Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
+
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().build();
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    for (int i = 0; i < 300; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+
+    Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+
+    List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+    Assert.assertEquals(slowLogPayloads.size(), 0);
+    conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+
+  }
+
+  @Test
+  public void testSlowLogFilters() throws Exception {
+
+    Configuration conf = applySlowLogRecorderConf(30);
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(15)
+        .setUserName("userName_87")
+        .build();
+
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    for (int i = 0; i < 100; i++) {
+      RpcLogDetails rpcLogDetails =
+        getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+      slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+    }
+    LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
+
+    AdminProtos.SlowLogResponseRequest requestClient =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(15)
+        .setClientAddress("client_85")
+        .build();
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
+
+    AdminProtos.SlowLogResponseRequest requestSlowLog =
+      AdminProtos.SlowLogResponseRequest.newBuilder()
+        .setLimit(15)
+        .build();
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
+      () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
+
+  }
+
+  @Test
+  public void testConcurrentSlowLogEvents() throws Exception {
+
+    Configuration conf = applySlowLogRecorderConf(50000);
+    slowLogRecorder = new SlowLogRecorder(conf);
+    AdminProtos.SlowLogResponseRequest request =
+      AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
+    Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+    LOG.debug("Initially ringbuffer of Slow Log records is empty");
+
+    for (int j = 0; j < 1000; j++) {
+
+      CompletableFuture.runAsync(() -> {
+        for (int i = 0; i < 3500; i++) {
+          RpcLogDetails rpcLogDetails =
+            getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+          slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+        }
+      });
+
+    }
+
+    Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+
+    slowLogRecorder.clearSlowLogPayloads();
+
+    Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
+      4000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
+  }
+
+  private RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
+    return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className);
+  }
+
+  private RpcCall getRpcCall(String userName) {
+    RpcCall rpcCall = new RpcCall() {
+      @Override
+      public BlockingService getService() {
+        return null;
+      }
+
+      @Override
+      public Descriptors.MethodDescriptor getMethod() {
+        return null;
+      }
+
+      @Override
+      public Message getParam() {
+        return getMessage();
+      }
+
+      @Override
+      public CellScanner getCellScanner() {
+        return null;
+      }
+
+      @Override
+      public long getReceiveTime() {
+        return 0;
+      }
+
+      @Override
+      public long getStartTime() {
+        return 0;
+      }
+
+      @Override
+      public void setStartTime(long startTime) {
+
+      }
+
+      @Override
+      public int getTimeout() {
+        return 0;
+      }
+
+      @Override
+      public int getPriority() {
+        return 0;
+      }
+
+      @Override
+      public long getDeadline() {
+        return 0;
+      }
+
+      @Override
+      public long getSize() {
+        return 0;
+      }
+
+      @Override
+      public RPCProtos.RequestHeader getHeader() {
+        return null;
+      }
+
+      @Override
+      public int getRemotePort() {
+        return 0;
+      }
+
+      @Override
+      public void setResponse(Message param, CellScanner cells,
+        Throwable errorThrowable, String error) {
+      }
+
+      @Override
+      public void sendResponseIfReady() throws IOException {
+      }
+
+      @Override
+      public void cleanup() {
+      }
+
+      @Override
+      public String toShortString() {
+        return null;
+      }
+
+      @Override
+      public long disconnectSince() {
+        return 0;
+      }
+
+      @Override
+      public boolean isClientCellBlockSupported() {
+        return false;
+      }
+
+      @Override
+      public Optional<User> getRequestUser() {
+        return getUser(userName);
+      }
+
+      @Override
+      public InetAddress getRemoteAddress() {
+        return null;
+      }
+
+      @Override
+      public HBaseProtos.VersionInfo getClientVersionInfo() {
+        return null;
+      }
+
+      @Override
+      public void setCallBack(RpcCallback callback) {
+      }
+
+      @Override
+      public boolean isRetryImmediatelySupported() {
+        return false;
+      }
+
+      @Override
+      public long getResponseCellSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseCellSize(long cellSize) {
+      }
+
+      @Override
+      public long getResponseBlockSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseBlockSize(long blockSize) {
+      }
+
+      @Override
+      public long getResponseExceptionSize() {
+        return 0;
+      }
+
+      @Override
+      public void incrementResponseExceptionSize(long exceptionSize) {
+      }
+    };
+    return rpcCall;
+  }
+
+  private Message getMessage() {
+
+    i = (i + 1) % 3;
+
+    Message message = null;
+
+    switch (i) {
+
+      case 0: {
+        message = ClientProtos.ScanRequest.newBuilder()
+          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+            .setValue(ByteString.copyFromUtf8("region1"))
+            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME)
+            .build())
+          .build();
+        break;
+      }
+      case 1: {
+        message = ClientProtos.MutateRequest.newBuilder()
+          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+            .setValue(ByteString.copyFromUtf8("region2"))
+            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
+          .setMutation(ClientProtos.MutationProto.newBuilder()
+            .setRow(ByteString.copyFromUtf8("row123"))
+            .build())
+          .build();
+        break;
+      }
+      case 2: {
+        message = ClientProtos.GetRequest.newBuilder()
+          .setRegion(HBaseProtos.RegionSpecifier.newBuilder()
+            .setValue(ByteString.copyFromUtf8("region2"))
+            .setType(HBaseProtos.RegionSpecifier.RegionSpecifierType.REGION_NAME))
+          .setGet(ClientProtos.Get.newBuilder()
+            .setRow(ByteString.copyFromUtf8("row123"))
+            .build())
+          .build();
+        break;
+      }
+
+    }
+
+    return message;
+
+  }
+
+  private Optional<User> getUser(String userName) {
+
+    return Optional.of(new User() {
+
+
+      @Override
+      public String getShortName() {
+        return userName;
+      }
+
+
+      @Override
+      public <T> T runAs(PrivilegedAction<T> action) {
+        return null;
+      }
+
+
+      @Override
+      public <T> T runAs(PrivilegedExceptionAction<T> action) throws
+          IOException, InterruptedException {
+        return null;
+      }
+
+    });
+
+  }
+
+}
diff --git a/hbase-shell/src/main/ruby/hbase/admin.rb b/hbase-shell/src/main/ruby/hbase/admin.rb
index f60838e..c3b4a8e 100644
--- a/hbase-shell/src/main/ruby/hbase/admin.rb
+++ b/hbase-shell/src/main/ruby/hbase/admin.rb
@@ -1434,6 +1434,79 @@ module Hbase
     end
 
     #----------------------------------------------------------------------------------------------
+    # Retrieve SlowLog Responses from RegionServers
+    def get_slowlog_responses(server_names, args)
+      unless server_names.is_a?(Array) || server_names.is_a?(String)
+        raise(ArgumentError,
+              "#{server_names.class} of #{server_names.inspect} is not of Array/String type")
+      end
+      if server_names == '*'
+        server_names = getServerNames([], true)
+      else
+        server_names_list = to_server_names(server_names)
+        server_names = getServerNames(server_names_list, false)
+      end
+      filter_params = get_filter_params(args)
+      slow_log_responses = @admin.getSlowLogResponses(java.util.HashSet.new(server_names),
+                                                      filter_params)
+      slow_log_responses_arr = []
+      for slow_log_response in slow_log_responses
+        slow_log_responses_arr << slow_log_response.toJsonPrettyPrint
+      end
+      puts 'Retrieved SlowLog Responses from RegionServers'
+      puts slow_log_responses_arr
+    end
+
+    def get_filter_params(args)
+      filter_params = org.apache.hadoop.hbase.client.SlowLogQueryFilter.new
+      if args.key? 'REGION_NAME'
+        region_name = args['REGION_NAME']
+        filter_params.setRegionName(region_name)
+      end
+      if args.key? 'TABLE_NAME'
+        table_name = args['TABLE_NAME']
+        filter_params.setTableName(table_name)
+      end
+      if args.key? 'CLIENT_IP'
+        client_ip = args['CLIENT_IP']
+        filter_params.setClientAddress(client_ip)
+      end
+      if args.key? 'USER'
+        user = args['USER']
+        filter_params.setUserName(user)
+      end
+      if args.key? 'LIMIT'
+        limit = args['LIMIT']
+        filter_params.setLimit(limit)
+      end
+      filter_params
+    end
+
+    #----------------------------------------------------------------------------------------------
+    # Clears SlowLog Responses from RegionServers
+    def clear_slowlog_responses(server_names)
+      unless server_names.nil? || server_names.is_a?(Array) || server_names.is_a?(String)
+        raise(ArgumentError,
+              "#{server_names.class} of #{server_names.inspect} is not of correct type")
+      end
+      if server_names.nil?
+        server_names = getServerNames([], true)
+      else
+        server_names_list = to_server_names(server_names)
+        server_names = getServerNames(server_names_list, false)
+      end
+      clear_log_responses = @admin.clearSlowLogResponses(java.util.HashSet.new(server_names))
+      clear_log_success_count = 0
+      clear_log_responses.each do |response|
+        if response
+          clear_log_success_count += 1
+        end
+      end
+      puts 'Cleared Slowlog responses from ' \
+           "#{clear_log_success_count}/#{clear_log_responses.size} RegionServers"
+    end
+
+    #----------------------------------------------------------------------------------------------
     # Decommission a list of region servers, optionally offload corresponding regions
     def decommission_regionservers(host_or_servers, should_offload)
       # Fail if host_or_servers is neither a string nor an array
@@ -1507,6 +1580,17 @@ module Hbase
     def stop_regionserver(hostport)
       @admin.stopRegionServer(hostport)
     end
+
+    #----------------------------------------------------------------------------------------------
+    # Get list of server names
+    def to_server_names(server_names)
+      if server_names.is_a?(Array)
+        server_names
+      else
+        java.util.Arrays.asList(server_names)
+      end
+    end
+
   end
   # rubocop:enable Metrics/ClassLength
 end
diff --git a/hbase-shell/src/main/ruby/shell.rb b/hbase-shell/src/main/ruby/shell.rb
index f09149f..6b441d6 100644
--- a/hbase-shell/src/main/ruby/shell.rb
+++ b/hbase-shell/src/main/ruby/shell.rb
@@ -333,10 +333,12 @@ Shell.load_command_group(
     normalizer_switch
     normalizer_enabled
     is_in_maintenance_mode
+    clear_slowlog_responses
     close_region
     compact
     compaction_switch
     flush
+    get_slowlog_responses
     major_compact
     move
     split
diff --git a/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb
new file mode 100644
index 0000000..ea96de3
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/clear_slowlog_responses.rb
@@ -0,0 +1,47 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Clear slowlog responses maintained in memory by RegionServers
+
+module Shell
+  module Commands
+    # Clear slowlog responses
+    class ClearSlowlogResponses < Command
+      def help
+        <<-EOF
+Clears SlowLog Responses maintained by each or specific RegionServers.
+Specify array of server names for specific RS. A server name is
+the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Examples:
+
+  hbase> clear_slowlog_responses                                     => clears slowlog responses from all RS
+  hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2']    => clears slowlog responses from SERVER_NAME1,
+                                                                        SERVER_NAME2
+
+
+        EOF
+      end
+
+      def command(server_names = nil)
+        admin.clear_slowlog_responses(server_names)
+      end
+    end
+  end
+end
diff --git a/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
new file mode 100644
index 0000000..55759ca
--- /dev/null
+++ b/hbase-shell/src/main/ruby/shell/commands/get_slowlog_responses.rb
@@ -0,0 +1,78 @@
+#
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with this
+# work for additional information regarding copyright ownership. The ASF
+# licenses this file to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+# Retrieve latest slowlog responses maintained in memory by RegionServers
+
+module Shell
+  module Commands
+    # Retrieve latest slowlog responses
+    class GetSlowlogResponses < Command
+      def help
+        <<-EOF
+Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
+Specify '*' to include all RS otherwise array of server names for specific
+RS. A server name is the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Provide optional filter parameters as Hash.
+Default Limit of each server for providing no of slow log records is 10. User can specify
+more limit by 'LIMIT' param in case more than 10 records should be retrieved.
+
+Examples:
+
+  hbase> get_slowlog_responses '*'                                 => get slowlog responses from all RS
+  hbase> get_slowlog_responses '*', {'LIMIT' => 50}                => get slowlog responses from all RS
+                                                                      with 50 records limit (default limit: 10)
+  hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2']    => get slowlog responses from SERVER_NAME1,
+                                                                      SERVER_NAME2
+  hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
+                                                                   => get slowlog responses only related to meta
+                                                                      region
+  hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'}         => get slowlog responses only related to t1 table
+  hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
+                                                                   => get slowlog responses with given client
+                                                                      IP address and get 100 records limit
+                                                                      (default limit: 10)
+  hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
+                                                                   => get slowlog responses with given region name
+                                                                      or table name
+  hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
+                                                                   => get slowlog responses that match either
+                                                                      provided client IP address or user name
+
+Sometimes output can be long pretty printed json for user to scroll in
+a single screen and hence user might prefer
+redirecting output of get_slowlog_responses to a file.
+
+Example:
+
+echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
+
+        EOF
+      end
+
+      def command(server_names, args = {})
+        unless args.is_a? Hash
+          raise 'Filter parameters are not Hash'
+        end
+
+        admin.get_slowlog_responses(server_names, args)
+      end
+    end
+  end
+end
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 7d32645..3264579 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -201,6 +201,20 @@ module Hbase
 
     #-------------------------------------------------------------------------------
 
+    define_test 'get slowlog responses should work' do
+      output = capture_stdout { command(:get_slowlog_responses, '*', {}) }
+      assert(output.include?('Retrieved SlowLog Responses from RegionServers'))
+    end
+
+    #-------------------------------------------------------------------------------
+
+    define_test 'clear slowlog responses should work' do
+      output = capture_stdout { command(:clear_slowlog_responses, nil) }
+      assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers'))
+    end
+
+    #-------------------------------------------------------------------------------
+
     define_test "create should fail with non-string/non-hash column args" do
       assert_raise(ArgumentError) do
         command(:create, @create_test_name, 123)
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
index 600c35d..5b56e62 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift2/client/ThriftAdmin.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.hbase.client.CompactType;
 import org.apache.hadoop.hbase.client.CompactionState;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.SlowLogQueryFilter;
+import org.apache.hadoop.hbase.client.SlowLogRecord;
 import org.apache.hadoop.hbase.client.SnapshotDescription;
 import org.apache.hadoop.hbase.client.SnapshotType;
 import org.apache.hadoop.hbase.client.TableDescriptor;
@@ -1393,6 +1395,17 @@ public class ThriftAdmin implements Admin {
   }
 
   @Override
+  public List<SlowLogRecord> getSlowLogResponses(final Set<ServerName> serverNames,
+      final SlowLogQueryFilter slowLogQueryFilter) {
+    throw new NotImplementedException("getSlowLogResponses not supported in ThriftAdmin");
+  }
+
+  @Override
+  public List<Boolean> clearSlowLogResponses(final Set<ServerName> serverNames) {
+    throw new NotImplementedException("clearSlowLogsResponses not supported in ThriftAdmin");
+  }
+
+  @Override
   public Future<Void> splitRegionAsync(byte[] regionName) throws IOException {
     return splitRegionAsync(regionName, null);
   }
diff --git a/src/main/asciidoc/_chapters/hbase-default.adoc b/src/main/asciidoc/_chapters/hbase-default.adoc
index ebed2f2..b0cd4bf 100644
--- a/src/main/asciidoc/_chapters/hbase-default.adoc
+++ b/src/main/asciidoc/_chapters/hbase-default.adoc
@@ -2072,6 +2072,44 @@ A comma-separated list of
 `-1`
 
 
+[[hbase.regionserver.slowlog.ringbuffer.size]]
+*`hbase.regionserver.slowlog.ringbuffer.size`*::
++
+.Description
+
+      Default size of ringbuffer to be maintained by each RegionServer in order
+      to store online slowlog responses. This is an in-memory ring buffer of
+      requests that were judged to be too slow in addition to the responseTooSlow
+      logging. The in-memory representation would be complete.
+      For more details, please look into Doc Section:
+      <<slow_log_responses, slow_log_responses>>
+
+
++
+.Default
+`256`
+
+
+
+[[hbase.regionserver.slowlog.buffer.enabled]]
+*`hbase.regionserver.slowlog.buffer.enabled`*::
++
+.Description
+
+      Indicates whether RegionServers have ring buffer running for storing
+      Online Slow logs in FIFO manner with limited entries. The size of
+      the ring buffer is indicated by config: hbase.regionserver.slowlog.ringbuffer.size
+      The default value is false, turn this on and get latest slowlog
+      responses with complete data.
+      For more details, please look into Doc Section:
+      <<slow_log_responses, slow_log_responses>>
+
+
++
+.Default
+`false`
+
+
 
 [[hbase.region.replica.replication.enabled]]
 *`hbase.region.replica.replication.enabled`*::
diff --git a/src/main/asciidoc/_chapters/ops_mgt.adoc b/src/main/asciidoc/_chapters/ops_mgt.adoc
index d8afcd5..f8f9c2a 100644
--- a/src/main/asciidoc/_chapters/ops_mgt.adoc
+++ b/src/main/asciidoc/_chapters/ops_mgt.adoc
@@ -1813,6 +1813,120 @@ In the case that the call is not a client operation, that detailed fingerprint i
 
 This particular example, for example, would indicate that the likely cause of slowness is simply a very large (on the order of 100MB) multiput, as we can tell by the "vlen," or value length, fields of each put in the multiPut.
 
+[[slow_log_responses]]
+==== Get Slow Response Log from shell
+When an individual RPC exceeds a configurable time bound we log a complaint
+by way of the logging subsystem
+
+e.g.
+
+----
+2019-10-02 10:10:22,195 WARN [,queue=15,port=60020] ipc.RpcServer - (responseTooSlow):
+{"call":"Scan(org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ScanRequest)",
+"starttimems":1567203007549,
+"responsesize":6819737,
+"method":"Scan",
+"param":"region { type: REGION_NAME value: \"t1,\\000\\000\\215\\f)o\\\\\\024\\302\\220\\000\\000\\000\\000\\000\\001\\000\\000\\000\\000\\000\\006\\000\\000\\000\\000\\000\\005\\000\\000<TRUNCATED>",
+"processingtimems":28646,
+"client":"10.253.196.215:41116",
+"queuetimems":22453,
+"class":"HRegionServer"}
+----
+
+Unfortunately often the request parameters are truncated as per above Example.
+The truncation is unfortunate because it eliminates much of the utility of
+the warnings. For example, the region name, the start and end keys, and the
+filter hierarchy are all important clues for debugging performance problems
+caused by moderate to low selectivity queries or queries made at a high rate.
+
+HBASE-22978 introduces maintaining an in-memory ring buffer of requests that were judged to
+be too slow in addition to the responseTooSlow logging. The in-memory representation can be
+complete. There is some chance a high rate of requests will cause information on other
+interesting requests to be overwritten before it can be read. This is an acceptable trade off.
+
+In order to enable the in-memory ring buffer at RegionServers, we need to enable
+config:
+----
+hbase.regionserver.slowlog.buffer.enabled
+----
+
+One more config determines the size of the ring buffer:
+----
+hbase.regionserver.slowlog.ringbuffer.size
+----
+
+Check the config section for the detailed description.
+
+This config would be disabled by default. Turn it on and these shell commands
+would provide expected results from the ring-buffers.
+
+
+shell commands to retrieve slowlog responses from RegionServers:
+
+----
+Retrieve latest SlowLog Responses maintained by each or specific RegionServers.
+Specify '*' to include all RS otherwise array of server names for specific
+RS. A server name is the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Provide optional filter parameters as Hash.
+Default Limit of each server for providing no of slow log records is 10. User can specify
+more limit by 'LIMIT' param in case more than 10 records should be retrieved.
+
+Examples:
+
+  hbase> get_slowlog_responses '*'                                 => get slowlog responses from all RS
+  hbase> get_slowlog_responses '*', {'LIMIT' => 50}                => get slowlog responses from all RS
+                                                                      with 50 records limit (default limit: 10)
+  hbase> get_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2']    => get slowlog responses from SERVER_NAME1,
+                                                                      SERVER_NAME2
+  hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1'}
+                                                                   => get slowlog responses only related to meta
+                                                                      region
+  hbase> get_slowlog_responses '*', {'TABLE_NAME' => 't1'}         => get slowlog responses only related to t1 table
+  hbase> get_slowlog_responses '*', {'CLIENT_IP' => '192.162.1.40:60225', 'LIMIT' => 100}
+                                                                   => get slowlog responses with given client
+                                                                      IP address and get 100 records limit
+                                                                      (default limit: 10)
+  hbase> get_slowlog_responses '*', {'REGION_NAME' => 'hbase:meta,,1', 'TABLE_NAME' => 't1'}
+                                                                   => get slowlog responses with given region name
+                                                                      or table name
+  hbase> get_slowlog_responses '*', {'USER' => 'user_name', 'CLIENT_IP' => '192.162.1.40:60225'}
+                                                                   => get slowlog responses that match either
+                                                                      provided client IP address or user name
+
+
+----
+
+Sometimes output can be long pretty printed json for user to scroll in
+a single screen and hence user might prefer
+redirecting output of get_slowlog_responses to a file.
+
+Example:
+----
+echo "get_slowlog_responses '*'" | hbase shell > xyz.out 2>&1
+----
+
+
+shell command to clear slowlog responses from RegionServer:
+
+----
+Clears SlowLog Responses maintained by each or specific RegionServers.
+Specify array of server names for specific RS. A server name is
+the host, port plus startcode of a RegionServer.
+e.g.: host187.example.com,60020,1289493121758 (find servername in
+master ui or when you do detailed status in shell)
+
+Examples:
+
+  hbase> clear_slowlog_responses                                     => clears slowlog responses from all RS
+  hbase> clear_slowlog_responses ['SERVER_NAME1', 'SERVER_NAME2']    => clears slowlog responses from SERVER_NAME1,
+                                                                        SERVER_NAME2
+
+
+----
+
 === Block Cache Monitoring
 
 Starting with HBase 0.98, the HBase Web UI includes the ability to monitor and report on the performance of the block cache.