You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/07/22 06:06:57 UTC
[hbase] branch branch-2 updated: HBASE-24718 : Generic NamedQueue
framework for multiple use-cases (Refactor SlowLog responses) (#2110)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new ce4e692 HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2110)
ce4e692 is described below
commit ce4e6926993a4308a55f71489aa77c448b414ee0
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed Jul 22 11:36:19 2020 +0530
HBASE-24718 : Generic NamedQueue framework for multiple use-cases (Refactor SlowLog responses) (#2110)
Closes #2052
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../hbase/shaded/protobuf/RequestConverter.java | 5 +
hbase-common/src/main/resources/hbase-default.xml | 12 +
.../src/main/protobuf/Admin.proto | 6 +
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 26 +-
.../hadoop/hbase/ipc/RpcServerInterface.java | 10 +-
.../DisruptorExceptionHandler.java | 2 +-
.../hadoop/hbase/namequeues/LogEventHandler.java | 130 ++++++++++
.../slowlog => namequeues}/LogHandlerUtils.java | 6 +-
.../NamedQueuePayload.java} | 33 ++-
.../NamedQueueRecorder.java} | 120 ++++------
.../hadoop/hbase/namequeues/NamedQueueService.java | 69 ++++++
.../slowlog => namequeues}/RingBufferEnvelope.java | 22 +-
.../slowlog => namequeues}/RpcLogDetails.java | 5 +-
.../hbase/namequeues/SlowLogPersistentService.java | 98 ++++++++
.../SlowLogTableOpsChore.java | 12 +-
.../hbase/namequeues/impl/SlowLogQueueService.java | 264 ++++++++++++++++++++
.../namequeues/request/NamedQueueGetRequest.java | 65 +++++
.../namequeues/response/NamedQueueGetResponse.java | 61 +++++
.../hadoop/hbase/regionserver/HRegionServer.java | 23 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 52 ++--
.../regionserver/slowlog/LogEventHandler.java | 266 ---------------------
.../TestNamedQueueRecorder.java} | 216 ++++++++++-------
.../TestSlowLogAccessor.java | 56 +++--
hbase-shell/src/test/ruby/hbase/admin_test.rb | 2 +-
.../thrift2/TestThriftHBaseServiceHandler.java | 1 +
25 files changed, 1027 insertions(+), 535 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
index 1218aac..13d49f1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/shaded/protobuf/RequestConverter.java
@@ -2055,6 +2055,11 @@ public final class RequestConverter {
} else {
builder.setFilterByOperator(SlowLogResponseRequest.FilterByOperator.OR);
}
+ if (LogQueryFilter.Type.SLOW_LOG.equals(logQueryFilter.getType())) {
+ builder.setLogType(SlowLogResponseRequest.LogType.SLOW_LOG);
+ } else {
+ builder.setLogType(SlowLogResponseRequest.LogType.LARGE_LOG);
+ }
return builder.setLimit(logQueryFilter.getLimit()).build();
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 61d3a31..5b77dfa 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1996,4 +1996,16 @@ possible configurations would overwhelm and obscure the important.
too large batch request.
</description>
</property>
+ <property>
+ <name>hbase.namedqueue.provider.classes</name>
+ <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService</value>
+ <description>
+ Default values for NamedQueueService implementors. This comma separated full class names
+ represent all implementors of NamedQueueService that we would like to be invoked by
+ LogEvent handler service. One example of NamedQueue service is SlowLogQueueService which
+ is used to store slow/large RPC logs in ringbuffer at each RegionServer.
+ All implementors of NamedQueueService should be found under package:
+ "org.apache.hadoop.hbase.namequeues.impl"
+ </description>
+ </property>
</configuration>
diff --git a/hbase-protocol-shaded/src/main/protobuf/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
index a9fb63b..aa04ba6 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Admin.proto
@@ -287,12 +287,18 @@ message SlowLogResponseRequest {
OR = 1;
}
+ enum LogType {
+ SLOW_LOG = 0;
+ LARGE_LOG = 1;
+ }
+
optional string region_name = 1;
optional string table_name = 2;
optional string client_address = 3;
optional string user_name = 4;
optional uint32 limit = 5 [default = 10];
optional FilterByOperator filter_by_operator = 6 [default = OR];
+ optional LogType log_type = 7;
}
message SlowLogResponses {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 3a59a4b..cace5f0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CallQueueTooBigException;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
import org.apache.hadoop.hbase.exceptions.RequestTooBigException;
@@ -46,8 +47,8 @@ import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.slowlog.RpcLogDetails;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.HBasePolicyProvider;
import org.apache.hadoop.hbase.security.SaslUtil;
import org.apache.hadoop.hbase.security.SaslUtil.QualityOfProtection;
@@ -96,6 +97,7 @@ public abstract class RpcServer implements RpcServerInterface,
private static final String MULTI_SERVICE_CALLS = "multi.service_calls";
private final boolean authorize;
+ private final boolean isOnlineLogProviderEnabled;
protected boolean isSecurityEnabled;
public static final byte CURRENT_VERSION = 0;
@@ -227,7 +229,7 @@ public abstract class RpcServer implements RpcServerInterface,
/**
* Use to add online slowlog responses
*/
- private SlowLogRecorder slowLogRecorder;
+ private NamedQueueRecorder namedQueueRecorder;
@FunctionalInterface
protected interface CallCleanup {
@@ -302,6 +304,8 @@ public abstract class RpcServer implements RpcServerInterface,
saslProps = Collections.emptyMap();
}
+ this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+ HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
this.scheduler = scheduler;
}
@@ -430,11 +434,11 @@ public abstract class RpcServer implements RpcServerInterface,
tooLarge, tooSlow,
status.getClient(), startTime, processingTime, qTime,
responseSize, userName);
- if (this.slowLogRecorder != null) {
+ if (this.namedQueueRecorder != null && this.isOnlineLogProviderEnabled) {
// send logs to ring buffer owned by slowLogRecorder
- final String className = server == null ? StringUtils.EMPTY :
- server.getClass().getSimpleName();
- this.slowLogRecorder.addSlowLogPayload(
+ final String className =
+ server == null ? StringUtils.EMPTY : server.getClass().getSimpleName();
+ this.namedQueueRecorder.addRecord(
new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
@@ -817,12 +821,8 @@ public abstract class RpcServer implements RpcServerInterface,
}
@Override
- public void setSlowLogRecorder(SlowLogRecorder slowLogRecorder) {
- this.slowLogRecorder = slowLogRecorder;
+ public void setNamedQueueRecorder(NamedQueueRecorder namedQueueRecorder) {
+ this.namedQueueRecorder = namedQueueRecorder;
}
- @Override
- public SlowLogRecorder getSlowLogRecorder() {
- return slowLogRecorder;
- }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index c8a71f3..99e0188 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.io.ByteBuffAllocator;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.yetus.audience.InterfaceAudience;
@@ -102,12 +102,8 @@ public interface RpcServerInterface {
/**
* Set Online SlowLog Provider
*
- * @param slowLogRecorder instance of {@link SlowLogRecorder}
+ * @param namedQueueRecorder instance of {@link NamedQueueRecorder}
*/
- void setSlowLogRecorder(final SlowLogRecorder slowLogRecorder);
+ void setNamedQueueRecorder(final NamedQueueRecorder namedQueueRecorder);
- /**
- * @return Retrieve instance of {@link SlowLogRecorder} maintained by RpcServer
- */
- SlowLogRecorder getSlowLogRecorder();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
similarity index 96%
copy from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
copy to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
index 53a2ef1..fcaecc6 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/DisruptorExceptionHandler.java
@@ -17,7 +17,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
new file mode 100644
index 0000000..8b8db2f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
@@ -0,0 +1,130 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import com.lmax.disruptor.EventHandler;
+import com.lmax.disruptor.RingBuffer;
+
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Event Handler run by disruptor ringbuffer consumer.
+ * Although this is generic implementation for namedQueue, it can have individual queue specific
+ * logic.
+ */
+@InterfaceAudience.Private
+class LogEventHandler implements EventHandler<RingBufferEnvelope> {
+
+ private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
+
+ // Map that binds namedQueues to corresponding queue service implementation.
+ // If NamedQueue of specific type is enabled, corresponding service will be used to
+ // insert and retrieve records.
+ // Individual queue sizes should be determined based on their individual configs within
+ // each service.
+ private final Map<NamedQueuePayload.NamedQueueEvent, NamedQueueService> namedQueueServices =
+ new HashMap<>();
+
+ private static final String NAMED_QUEUE_PROVIDER_CLASSES = "hbase.namedqueue.provider.classes";
+
+ LogEventHandler(final Configuration conf) {
+ for (String implName : conf.getStringCollection(NAMED_QUEUE_PROVIDER_CLASSES)) {
+ Class<?> clz;
+ try {
+ clz = Class.forName(implName);
+ } catch (ClassNotFoundException e) {
+ LOG.warn("Failed to find NamedQueueService implementor class {}", implName, e);
+ continue;
+ }
+
+ if (!NamedQueueService.class.isAssignableFrom(clz)) {
+ LOG.warn("Class {} is not implementor of NamedQueueService.", clz);
+ continue;
+ }
+
+ // add all service mappings here
+ try {
+ NamedQueueService namedQueueService =
+ (NamedQueueService) clz.getConstructor(Configuration.class).newInstance(conf);
+ namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
+ } catch (InstantiationException | IllegalAccessException | NoSuchMethodException
+ | InvocationTargetException e) {
+ LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.",
+ clz);
+ }
+ }
+ }
+
+ /**
+ * Called when a publisher has published an event to the {@link RingBuffer}.
+ * This is generic consumer of disruptor ringbuffer and for each new namedQueue that we
+ * add, we should also provide specific consumer logic here.
+ *
+ * @param event published to the {@link RingBuffer}
+ * @param sequence of the event being processed
+ * @param endOfBatch flag to indicate if this is the last event in a batch from
+ * the {@link RingBuffer}
+ */
+ @Override
+ public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch) {
+ final NamedQueuePayload namedQueuePayload = event.getPayload();
+ // consume ringbuffer payload based on event type
+ namedQueueServices.get(namedQueuePayload.getNamedQueueEvent())
+ .consumeEventFromDisruptor(namedQueuePayload);
+ }
+
+ /**
+ * Cleans up queues maintained by services.
+ *
+ * @param namedQueueEvent type of queue to clear
+ * @return true if queue is cleaned up, false otherwise
+ */
+ boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ return namedQueueServices.get(namedQueueEvent).clearNamedQueue();
+ }
+
+ /**
+ * Add all in memory queue records to system table. The implementors can use system table
+ * or direct HDFS file or ZK as persistence system.
+ */
+ void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ namedQueueServices.get(namedQueueEvent).persistAll();
+ }
+
+ /**
+ * Retrieve in memory queue records from ringbuffer
+ *
+ * @param request namedQueue request with event type
+ * @return queue records from ringbuffer after filter (if applied)
+ */
+ NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ return namedQueueServices.get(request.getNamedQueueEvent()).getNamedQueueRecords(request);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
similarity index 96%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
index f4d850f..f04cb18 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogHandlerUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogHandlerUtils.java
@@ -17,7 +17,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import java.util.ArrayList;
import java.util.List;
@@ -30,7 +30,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* Event Handler utility class
*/
@InterfaceAudience.Private
-class LogHandlerUtils {
+public class LogHandlerUtils {
private static int getTotalFiltersCount(AdminProtos.SlowLogResponseRequest request) {
int totalFilters = 0;
@@ -91,7 +91,7 @@ class LogHandlerUtils {
return filteredSlowLogPayloads;
}
- static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
+ public static List<TooSlowLog.SlowLogPayload> getFilteredLogs(
AdminProtos.SlowLogResponseRequest request, List<TooSlowLog.SlowLogPayload> logPayloadList) {
int totalFilters = getTotalFiltersCount(request);
if (totalFilters > 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
similarity index 54%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index 53a2ef1..7aa87fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/DisruptorExceptionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -17,34 +17,33 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
-import com.lmax.disruptor.ExceptionHandler;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
- * Exception Handler for Online Slow Log Ring Buffer
+ * Base payload to be prepared by client to send various namedQueue events for in-memory
+ * ring buffer storage in either HMaster or RegionServer.
+ * e.g slowLog responses
*/
@InterfaceAudience.Private
-class DisruptorExceptionHandler implements ExceptionHandler<RingBufferEnvelope> {
+public class NamedQueuePayload {
- private static final Logger LOG = LoggerFactory.getLogger(DisruptorExceptionHandler.class);
-
- @Override
- public void handleEventException(Throwable e, long sequence, RingBufferEnvelope event) {
- LOG.error("Sequence={}, event={}", sequence, event, e);
+ public enum NamedQueueEvent {
+ SLOW_LOG
}
- @Override
- public void handleOnStartException(Throwable e) {
- LOG.error("Disruptor onStartException: ", e);
+ private final NamedQueueEvent namedQueueEvent;
+
+ public NamedQueuePayload(NamedQueueEvent namedQueueEvent) {
+ if (namedQueueEvent == null) {
+ throw new RuntimeException("NamedQueuePayload with null namedQueueEvent");
+ }
+ this.namedQueueEvent = namedQueueEvent;
}
- @Override
- public void handleOnShutdownException(Throwable e) {
- LOG.error("Disruptor onShutdownException: ", e);
+ public NamedQueueEvent getNamedQueueEvent() {
+ return namedQueueEvent;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
similarity index 50%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index b0fb3e7..cb3512a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -17,87 +17,80 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import com.lmax.disruptor.BlockingWaitStrategy;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
-import java.util.Collections;
-import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
-
/**
- * Online Slow/Large Log Provider Service that keeps slow/large RPC logs in the ring buffer.
- * The service uses LMAX Disruptor to save slow records which are then consumed by
+ * NamedQueue recorder that maintains various named queues.
+ * The service uses LMAX Disruptor to save queue records which are then consumed by
* a queue and based on the ring buffer size, the available records are then fetched
* from the queue in thread-safe manner.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class SlowLogRecorder {
+public class NamedQueueRecorder {
private final Disruptor<RingBufferEnvelope> disruptor;
private final LogEventHandler logEventHandler;
- private final int eventCount;
- private final boolean isOnlineLogProviderEnabled;
- private static final String SLOW_LOG_RING_BUFFER_SIZE =
- "hbase.regionserver.slowlog.ringbuffer.size";
+ private static NamedQueueRecorder namedQueueRecorder;
+ private static boolean isInit = false;
+ private static final Object LOCK = new Object();
/**
* Initialize disruptor with configurable ringbuffer size
*/
- public SlowLogRecorder(Configuration conf) {
- isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
- HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
-
- if (!isOnlineLogProviderEnabled) {
- this.disruptor = null;
- this.logEventHandler = null;
- this.eventCount = 0;
- return;
- }
-
- this.eventCount = conf.getInt(SLOW_LOG_RING_BUFFER_SIZE,
- HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+ private NamedQueueRecorder(Configuration conf) {
// This is the 'writer' -- a single threaded executor. This single thread consumes what is
// put on the ringbuffer.
final String hostingThreadName = Thread.currentThread().getName();
+ int eventCount = conf.getInt("hbase.namedqueue.ringbuffer.size", 1024);
+
// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new,
- getEventCount(),
+ getEventCount(eventCount),
Threads.newDaemonThreadFactory(hostingThreadName + ".slowlog.append"),
ProducerType.MULTI,
new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
- final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
- HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
- this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
+ this.logEventHandler = new LogEventHandler(conf);
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
this.disruptor.start();
}
+ public static NamedQueueRecorder getInstance(Configuration conf) {
+ if (namedQueueRecorder != null) {
+ return namedQueueRecorder;
+ }
+ synchronized (LOCK) {
+ if (!isInit) {
+ namedQueueRecorder = new NamedQueueRecorder(conf);
+ isInit = true;
+ }
+ }
+ return namedQueueRecorder;
+ }
+
// must be power of 2 for disruptor ringbuffer
- private int getEventCount() {
- Preconditions.checkArgument(eventCount >= 0,
- SLOW_LOG_RING_BUFFER_SIZE + " must be > 0");
+ private int getEventCount(int eventCount) {
+ Preconditions.checkArgument(eventCount >= 0, "hbase.namedqueue.ringbuffer.size must be > 0");
int floor = Integer.highestOneBit(eventCount);
if (floor == eventCount) {
return floor;
@@ -110,66 +103,53 @@ public class SlowLogRecorder {
}
/**
- * Retrieve online slow logs from ringbuffer
- *
- * @param request slow log request parameters
- * @return online slow logs from ringbuffer
- */
- public List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
- return isOnlineLogProviderEnabled ? this.logEventHandler.getSlowLogPayloads(request)
- : Collections.emptyList();
- }
-
- /**
- * Retrieve online large logs from ringbuffer
+ * Retrieve in memory queue records from ringbuffer
*
- * @param request large log request parameters
- * @return online large logs from ringbuffer
+ * @param request namedQueue request with event type
+ * @return queue records from ringbuffer after filter (if applied)
*/
- public List<SlowLogPayload> getLargeLogPayloads(AdminProtos.SlowLogResponseRequest request) {
- return isOnlineLogProviderEnabled ? this.logEventHandler.getLargeLogPayloads(request)
- : Collections.emptyList();
+ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ return this.logEventHandler.getNamedQueueRecords(request);
}
/**
- * clears slow log payloads from ringbuffer
+ * clears queue records from ringbuffer
*
+ * @param namedQueueEvent type of queue to clear
* @return true if slow log payloads are cleaned up or
* hbase.regionserver.slowlog.buffer.enabled is not set to true, false if failed to
* clean up slow logs
*/
- public boolean clearSlowLogPayloads() {
- if (!isOnlineLogProviderEnabled) {
- return true;
- }
- return this.logEventHandler.clearSlowLogs();
+ public boolean clearNamedQueue(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ return this.logEventHandler.clearNamedQueue(namedQueueEvent);
}
/**
- * Add slow log rpcCall details to ringbuffer
+ * Add various NamedQueue records to ringbuffer. Based on the type of the event (e.g slowLog),
+ * consumer of disruptor ringbuffer will have specific logic.
+ * This method is producer of disruptor ringbuffer which is initialized in NamedQueueRecorder
+ * constructor.
*
- * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
- * consumers
+ * @param namedQueuePayload namedQueue payload sent by client of ring buffer
+ * service
*/
- public void addSlowLogPayload(RpcLogDetails rpcLogDetails) {
- if (!isOnlineLogProviderEnabled) {
- return;
- }
+ public void addRecord(NamedQueuePayload namedQueuePayload) {
RingBuffer<RingBufferEnvelope> ringBuffer = this.disruptor.getRingBuffer();
long seqId = ringBuffer.next();
try {
- ringBuffer.get(seqId).load(rpcLogDetails);
+ ringBuffer.get(seqId).load(namedQueuePayload);
} finally {
ringBuffer.publish(seqId);
}
}
/**
- * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
+ * Add all in memory queue records to system table. The implementors can use system table
+ * or direct HDFS file or ZK as persistence system.
*/
- public void addAllLogsToSysTable() {
+ public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
if (this.logEventHandler != null) {
- this.logEventHandler.addAllLogsToSysTable();
+ this.logEventHandler.persistAll(namedQueueEvent);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
new file mode 100644
index 0000000..84c1b24
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * In-memory Queue service provider for multiple use-cases. Implementers should be
+ * registered in LogEventHandler
+ */
+@InterfaceAudience.Private
+public interface NamedQueueService {
+
+ /**
+ * Retrieve event type for NamedQueueService implementation.
+ *
+ * @return {@link NamedQueuePayload.NamedQueueEvent}
+ */
+ NamedQueuePayload.NamedQueueEvent getEvent();
+
+ /**
+ * This implementation is generic for consuming records from LMAX
+ * disruptor and inserts records to EvictingQueue which is maintained by each
+ * ringbuffer provider.
+ *
+ * @param namedQueuePayload namedQueue payload from disruptor ring buffer
+ */
+ void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload);
+
+ /**
+ * Cleans up queues maintained by services.
+ *
+ * @return true if slow log payloads are cleaned up, false otherwise
+ */
+ boolean clearNamedQueue();
+
+ /**
+ * Retrieve in memory queue records from ringbuffer
+ *
+ * @param request namedQueue request with event type
+ * @return queue records from ringbuffer after filter (if applied)
+ */
+ NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request);
+
+ /**
+ * Add all in memory queue records to system table. The implementors can use system table
+ * or direct HDFS file or ZK as persistence system.
+ */
+ void persistAll();
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
similarity index 68%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
index d308670..f93baaa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RingBufferEnvelope.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RingBufferEnvelope.java
@@ -17,29 +17,29 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.yetus.audience.InterfaceAudience;
/**
- * An envelope to carry payload in the slow log ring buffer that serves as online buffer
- * to provide latest TooSlowLog
+ * An envelope to carry payload in the ring buffer that serves as online buffer
+ * to provide latest events
*/
@InterfaceAudience.Private
final class RingBufferEnvelope {
- private RpcLogDetails rpcLogDetails;
+ private NamedQueuePayload namedQueuePayload;
/**
* Load the Envelope with {@link RpcCall}
*
- * @param rpcLogDetails all details of rpc call that would be useful for ring buffer
+ * @param namedQueuePayload all details of rpc call that would be useful for ring buffer
* consumers
*/
- public void load(RpcLogDetails rpcLogDetails) {
- this.rpcLogDetails = rpcLogDetails;
+ public void load(NamedQueuePayload namedQueuePayload) {
+ this.namedQueuePayload = namedQueuePayload;
}
/**
@@ -48,10 +48,10 @@ final class RingBufferEnvelope {
*
* @return Retrieve rpc log details
*/
- public RpcLogDetails getPayload() {
- final RpcLogDetails rpcLogDetails = this.rpcLogDetails;
- this.rpcLogDetails = null;
- return rpcLogDetails;
+ public NamedQueuePayload getPayload() {
+ final NamedQueuePayload namedQueuePayload = this.namedQueuePayload;
+ this.namedQueuePayload = null;
+ return namedQueuePayload;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
similarity index 94%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
index b469cdb..581d1a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/RpcLogDetails.java
@@ -17,7 +17,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
@@ -28,7 +28,7 @@ import org.apache.yetus.audience.InterfaceAudience;
* RpcCall details that would be passed on to ring buffer of slow log responses
*/
@InterfaceAudience.Private
-public class RpcLogDetails {
+public class RpcLogDetails extends NamedQueuePayload {
private final RpcCall rpcCall;
private final Message param;
@@ -40,6 +40,7 @@ public class RpcLogDetails {
public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
+ super(NamedQueueEvent.SLOW_LOG);
this.rpcCall = rpcCall;
this.param = param;
this.clientAddress = clientAddress;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
new file mode 100644
index 0000000..2c701ff
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
@@ -0,0 +1,98 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Persistent service provider for Slow/LargeLog events
+ */
+@InterfaceAudience.Private
+public class SlowLogPersistentService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogPersistentService.class);
+
+ private static final ReentrantLock LOCK = new ReentrantLock();
+ private static final String SYS_TABLE_QUEUE_SIZE =
+ "hbase.regionserver.slowlog.systable.queue.size";
+ private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
+ private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
+
+ private final Queue<TooSlowLog.SlowLogPayload> queueForSysTable;
+
+ private final Configuration configuration;
+
+ public SlowLogPersistentService(final Configuration configuration) {
+ this.configuration = configuration;
+ int sysTableQueueSize =
+ configuration.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
+ EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueueForTable =
+ EvictingQueue.create(sysTableQueueSize);
+ queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
+ }
+
+ public void addToQueueForSysTable(TooSlowLog.SlowLogPayload slowLogPayload) {
+ queueForSysTable.add(slowLogPayload);
+ }
+
+ /**
+ * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
+ */
+ public void addAllLogsToSysTable() {
+ if (queueForSysTable == null) {
+ LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
+ return;
+ }
+ if (LOCK.isLocked()) {
+ return;
+ }
+ LOCK.lock();
+ try {
+ List<TooSlowLog.SlowLogPayload> slowLogPayloads = new ArrayList<>();
+ int i = 0;
+ while (!queueForSysTable.isEmpty()) {
+ slowLogPayloads.add(queueForSysTable.poll());
+ i++;
+ if (i == SYSTABLE_PUT_BATCH_SIZE) {
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ slowLogPayloads.clear();
+ i = 0;
+ }
+ }
+ if (slowLogPayloads.size() > 0) {
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
similarity index 84%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
index 77749f7..bc892e3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
@@ -17,7 +17,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
@@ -33,7 +33,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
- private final SlowLogRecorder slowLogRecorder;
+ private final NamedQueueRecorder namedQueueRecorder;
/**
* Chore Constructor
@@ -41,12 +41,12 @@ public class SlowLogTableOpsChore extends ScheduledChore {
* @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
* cancel and cleanup
* @param period Period in millis with which this Chore repeats execution when scheduled
- * @param slowLogRecorder {@link SlowLogRecorder} instance
+ * @param namedQueueRecorder {@link NamedQueueRecorder} instance
*/
public SlowLogTableOpsChore(final Stoppable stopper, final int period,
- final SlowLogRecorder slowLogRecorder) {
+ final NamedQueueRecorder namedQueueRecorder) {
super("SlowLogTableOpsChore", stopper, period);
- this.slowLogRecorder = slowLogRecorder;
+ this.namedQueueRecorder = namedQueueRecorder;
}
@Override
@@ -54,7 +54,7 @@ public class SlowLogTableOpsChore extends ScheduledChore {
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is starting up.");
}
- slowLogRecorder.addAllLogsToSysTable();
+ namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
if (LOG.isTraceEnabled()) {
LOG.trace("SlowLog Table Ops Chore is closing.");
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
new file mode 100644
index 0000000..f26ff51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
@@ -0,0 +1,264 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.SlowLogParams;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.NamedQueueService;
+import org.apache.hadoop.hbase.namequeues.RpcLogDetails;
+import org.apache.hadoop.hbase.namequeues.SlowLogPersistentService;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
+import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Queue;
+import java.util.stream.Collectors;
+
+/**
+ * In-memory Queue service provider for Slow/LargeLog events
+ */
+@InterfaceAudience.Private
+public class SlowLogQueueService implements NamedQueueService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogQueueService.class);
+
+ private static final String SLOW_LOG_RING_BUFFER_SIZE =
+ "hbase.regionserver.slowlog.ringbuffer.size";
+
+ private final boolean isOnlineLogProviderEnabled;
+ private final boolean isSlowLogTableEnabled;
+ private final SlowLogPersistentService slowLogPersistentService;
+ private final Queue<TooSlowLog.SlowLogPayload> slowLogQueue;
+
+ public SlowLogQueueService(Configuration conf) {
+ this.isOnlineLogProviderEnabled = conf.getBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+ HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+
+ if (!isOnlineLogProviderEnabled) {
+ this.isSlowLogTableEnabled = false;
+ this.slowLogPersistentService = null;
+ this.slowLogQueue = null;
+ return;
+ }
+
+ // Initialize SlowLog Queue
+ int slowLogQueueSize =
+ conf.getInt(SLOW_LOG_RING_BUFFER_SIZE, HConstants.DEFAULT_SLOW_LOG_RING_BUFFER_SIZE);
+
+ EvictingQueue<TooSlowLog.SlowLogPayload> evictingQueue =
+ EvictingQueue.create(slowLogQueueSize);
+ slowLogQueue = Queues.synchronizedQueue(evictingQueue);
+
+ this.isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+ HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+ if (isSlowLogTableEnabled) {
+ slowLogPersistentService = new SlowLogPersistentService(conf);
+ } else {
+ slowLogPersistentService = null;
+ }
+ }
+
+ @Override
+ public NamedQueuePayload.NamedQueueEvent getEvent() {
+ return NamedQueuePayload.NamedQueueEvent.SLOW_LOG;
+ }
+
+ /**
+ * This implementation is specific to slowLog event. This consumes slowLog event from
+ * disruptor and inserts records to EvictingQueue.
+ *
+ * @param namedQueuePayload namedQueue payload from disruptor ring buffer
+ */
+ @Override
+ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+ if (!isOnlineLogProviderEnabled) {
+ return;
+ }
+ if (!(namedQueuePayload instanceof RpcLogDetails)) {
+ LOG.warn("SlowLogQueueService: NamedQueuePayload is not of type RpcLogDetails.");
+ return;
+ }
+ final RpcLogDetails rpcLogDetails = (RpcLogDetails) namedQueuePayload;
+ final RpcCall rpcCall = rpcLogDetails.getRpcCall();
+ final String clientAddress = rpcLogDetails.getClientAddress();
+ final long responseSize = rpcLogDetails.getResponseSize();
+ final String className = rpcLogDetails.getClassName();
+ final TooSlowLog.SlowLogPayload.Type type = getLogType(rpcLogDetails);
+ if (type == null) {
+ return;
+ }
+ Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
+ Message param = rpcLogDetails.getParam();
+ long receiveTime = rpcCall.getReceiveTime();
+ long startTime = rpcCall.getStartTime();
+ long endTime = System.currentTimeMillis();
+ int processingTime = (int) (endTime - startTime);
+ int qTime = (int) (startTime - receiveTime);
+ final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
+ int numGets = 0;
+ int numMutations = 0;
+ int numServiceCalls = 0;
+ if (param instanceof ClientProtos.MultiRequest) {
+ ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
+ for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
+ for (ClientProtos.Action action : regionAction.getActionList()) {
+ if (action.hasMutation()) {
+ numMutations++;
+ }
+ if (action.hasGet()) {
+ numGets++;
+ }
+ if (action.hasServiceCall()) {
+ numServiceCalls++;
+ }
+ }
+ }
+ }
+ final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
+ final String methodDescriptorName =
+ methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
+ TooSlowLog.SlowLogPayload slowLogPayload = TooSlowLog.SlowLogPayload.newBuilder()
+ .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
+ .setClientAddress(clientAddress)
+ .setMethodName(methodDescriptorName)
+ .setMultiGets(numGets)
+ .setMultiMutations(numMutations)
+ .setMultiServiceCalls(numServiceCalls)
+ .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
+ .setProcessingTime(processingTime)
+ .setQueueTime(qTime)
+ .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
+ .setResponseSize(responseSize)
+ .setServerClass(className)
+ .setStartTime(startTime)
+ .setType(type)
+ .setUserName(userName)
+ .build();
+ slowLogQueue.add(slowLogPayload);
+ if (isSlowLogTableEnabled) {
+ if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
+ slowLogPersistentService.addToQueueForSysTable(slowLogPayload);
+ }
+ }
+ }
+
+ @Override
+ public boolean clearNamedQueue() {
+ if (!isOnlineLogProviderEnabled) {
+ return false;
+ }
+ LOG.debug("Received request to clean up online slowlog buffer.");
+ slowLogQueue.clear();
+ return true;
+ }
+
+ @Override
+ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ if (!isOnlineLogProviderEnabled) {
+ return null;
+ }
+ final AdminProtos.SlowLogResponseRequest slowLogResponseRequest =
+ request.getSlowLogResponseRequest();
+ final List<TooSlowLog.SlowLogPayload> slowLogPayloads;
+ if (AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG
+ .equals(slowLogResponseRequest.getLogType())) {
+ slowLogPayloads = getLargeLogPayloads(slowLogResponseRequest);
+ } else {
+ slowLogPayloads = getSlowLogPayloads(slowLogResponseRequest);
+ }
+ NamedQueueGetResponse response = new NamedQueueGetResponse();
+ response.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ response.setSlowLogPayloads(slowLogPayloads);
+ return response;
+ }
+
+ private TooSlowLog.SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
+ final boolean isSlowLog = rpcCallDetails.isSlowLog();
+ final boolean isLargeLog = rpcCallDetails.isLargeLog();
+ final TooSlowLog.SlowLogPayload.Type type;
+ if (!isSlowLog && !isLargeLog) {
+ LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
+ rpcCallDetails);
+ return null;
+ }
+ if (isSlowLog && isLargeLog) {
+ type = TooSlowLog.SlowLogPayload.Type.ALL;
+ } else if (isSlowLog) {
+ type = TooSlowLog.SlowLogPayload.Type.SLOW_LOG;
+ } else {
+ type = TooSlowLog.SlowLogPayload.Type.LARGE_LOG;
+ }
+ return type;
+ }
+
+ /**
+ * Add all slowLog events to system table. This is only for slowLog event's persistence on
+ * system table.
+ */
+ @Override
+ public void persistAll() {
+ if (!isOnlineLogProviderEnabled) {
+ return;
+ }
+ if (slowLogPersistentService != null) {
+ slowLogPersistentService.addAllLogsToSysTable();
+ }
+ }
+
+ private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
+ final AdminProtos.SlowLogResponseRequest request) {
+ List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
+ Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
+ e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
+ || e.getType() == TooSlowLog.SlowLogPayload.Type.SLOW_LOG).collect(Collectors.toList());
+ // latest slow logs first, operator is interested in latest records from in-memory buffer
+ Collections.reverse(slowLogPayloadList);
+ return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
+ }
+
+ private List<TooSlowLog.SlowLogPayload> getLargeLogPayloads(
+ final AdminProtos.SlowLogResponseRequest request) {
+ List<TooSlowLog.SlowLogPayload> slowLogPayloadList =
+ Arrays.stream(slowLogQueue.toArray(new TooSlowLog.SlowLogPayload[0])).filter(
+ e -> e.getType() == TooSlowLog.SlowLogPayload.Type.ALL
+ || e.getType() == TooSlowLog.SlowLogPayload.Type.LARGE_LOG).collect(Collectors.toList());
+ // latest large logs first, operator is interested in latest records from in-memory buffer
+ Collections.reverse(slowLogPayloadList);
+ return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
new file mode 100644
index 0000000..6e88bf4
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/request/NamedQueueGetRequest.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.request;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * Request object to be used by ring buffer use-cases. Clients get records by sending
+ * this request object.
+ * For each ring buffer use-case, add request payload to this class, client should set
+ * namedQueueEvent based on use-case.
+ * Protobuf does not support inheritance, hence we need to work with
+ */
+@InterfaceAudience.Private
+public class NamedQueueGetRequest {
+
+ private AdminProtos.SlowLogResponseRequest slowLogResponseRequest;
+ private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
+
+ public AdminProtos.SlowLogResponseRequest getSlowLogResponseRequest() {
+ return slowLogResponseRequest;
+ }
+
+ public void setSlowLogResponseRequest(
+ AdminProtos.SlowLogResponseRequest slowLogResponseRequest) {
+ this.slowLogResponseRequest = slowLogResponseRequest;
+ }
+
+ public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
+ return namedQueueEvent;
+ }
+
+ public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ this.namedQueueEvent = namedQueueEvent;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("slowLogResponseRequest", slowLogResponseRequest)
+ .append("namedQueueEvent", namedQueueEvent)
+ .toString();
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
new file mode 100644
index 0000000..ee4ed43
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/response/NamedQueueGetResponse.java
@@ -0,0 +1,61 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.namequeues.response;
+
+import org.apache.commons.lang3.builder.ToStringBuilder;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.yetus.audience.InterfaceAudience;
+import java.util.List;
+
+/**
+ * Response object to be sent by namedQueue service back to caller
+ */
+@InterfaceAudience.Private
+public class NamedQueueGetResponse {
+
+ private List<TooSlowLog.SlowLogPayload> slowLogPayloads;
+ private NamedQueuePayload.NamedQueueEvent namedQueueEvent;
+
+ public List<TooSlowLog.SlowLogPayload> getSlowLogPayloads() {
+ return slowLogPayloads;
+ }
+
+ public void setSlowLogPayloads(List<TooSlowLog.SlowLogPayload> slowLogPayloads) {
+ this.slowLogPayloads = slowLogPayloads;
+ }
+
+ public NamedQueuePayload.NamedQueueEvent getNamedQueueEvent() {
+ return namedQueueEvent;
+ }
+
+ public void setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ this.namedQueueEvent = namedQueueEvent;
+ }
+
+ @Override
+ public String toString() {
+ return new ToStringBuilder(this)
+ .append("slowLogPayloads", slowLogPayloads)
+ .append("namedQueueEvent", namedQueueEvent)
+ .toString();
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index a726de6..18147e5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,8 +138,8 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -535,7 +535,7 @@ public class HRegionServer extends Thread implements
/**
* Provide online slow log responses from ringbuffer
*/
- private SlowLogRecorder slowLogRecorder;
+ private NamedQueueRecorder namedQueueRecorder = null;
/**
* True if this RegionServer is coming up in a cluster where there is no Master;
@@ -597,7 +597,12 @@ public class HRegionServer extends Thread implements
this.stopped = false;
if (!(this instanceof HMaster)) {
- this.slowLogRecorder = new SlowLogRecorder(this.conf);
+ final boolean isOnlineLogProviderEnabled = conf.getBoolean(
+ HConstants.SLOW_LOG_BUFFER_ENABLED_KEY,
+ HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
+ if (isOnlineLogProviderEnabled) {
+ this.namedQueueRecorder = NamedQueueRecorder.getInstance(this.conf);
+ }
}
rpcServices = createRpcServices();
useThisHostnameInstead = getUseThisHostnameInstead(conf);
@@ -1506,12 +1511,12 @@ public class HRegionServer extends Thread implements
}
/**
- * get Online SlowLog Provider to add slow logs to ringbuffer
+ * get NamedQueue Provider to add different logs to ringbuffer
*
- * @return Online SlowLog Provider
+ * @return NamedQueueRecorder
*/
- public SlowLogRecorder getSlowLogRecorder() {
- return this.slowLogRecorder;
+ public NamedQueueRecorder getNamedQueueRecorder() {
+ return this.namedQueueRecorder;
}
/*
@@ -2070,7 +2075,7 @@ public class HRegionServer extends Thread implements
if (isSlowLogTableEnabled) {
// default chore duration: 10 min
final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
- slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
+ slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
}
// Create the thread to clean the moved regions list
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index a6e82e7..cbca1a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -108,6 +108,9 @@ import org.apache.hadoop.hbase.ipc.ServerRpcController;
import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.MasterRpcServices;
+import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
import org.apache.hadoop.hbase.quotas.ActivePolicyEnforcement;
@@ -128,7 +131,7 @@ import org.apache.hadoop.hbase.regionserver.handler.OpenMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenPriorityRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.OpenRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
-import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -1301,7 +1304,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
rpcServer = createRpcServer(rs, rpcSchedulerFactory, bindAddress, name);
rpcServer.setRsRpcServices(this);
if (!(rs instanceof HMaster)) {
- rpcServer.setSlowLogRecorder(rs.getSlowLogRecorder());
+ rpcServer.setNamedQueueRecorder(rs.getNamedQueueRecorder());
}
scannerLeaseTimeoutPeriod = conf.getInt(
HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
@@ -3983,28 +3986,39 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getSlowLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
- final SlowLogRecorder slowLogRecorder =
- this.regionServer.getSlowLogRecorder();
- final List<SlowLogPayload> slowLogPayloads;
- slowLogPayloads = slowLogRecorder != null
- ? slowLogRecorder.getSlowLogPayloads(request)
- : Collections.emptyList();
+ final NamedQueueRecorder namedQueueRecorder =
+ this.regionServer.getNamedQueueRecorder();
+ final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
return slowLogResponses;
}
+ private List<SlowLogPayload> getSlowLogPayloads(SlowLogResponseRequest request,
+ NamedQueueRecorder namedQueueRecorder) {
+ if (namedQueueRecorder == null) {
+ return Collections.emptyList();
+ }
+ List<SlowLogPayload> slowLogPayloads;
+ NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setSlowLogResponseRequest(request);
+ NamedQueueGetResponse namedQueueGetResponse =
+ namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ slowLogPayloads = namedQueueGetResponse != null ?
+ namedQueueGetResponse.getSlowLogPayloads() :
+ Collections.emptyList();
+ return slowLogPayloads;
+ }
+
@Override
@QosPriority(priority = HConstants.ADMIN_QOS)
public SlowLogResponses getLargeLogResponses(final RpcController controller,
final SlowLogResponseRequest request) {
- final SlowLogRecorder slowLogRecorder =
- this.regionServer.getSlowLogRecorder();
- final List<SlowLogPayload> slowLogPayloads;
- slowLogPayloads = slowLogRecorder != null
- ? slowLogRecorder.getLargeLogPayloads(request)
- : Collections.emptyList();
+ final NamedQueueRecorder namedQueueRecorder =
+ this.regionServer.getNamedQueueRecorder();
+ final List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request, namedQueueRecorder);
SlowLogResponses slowLogResponses = SlowLogResponses.newBuilder()
.addAllSlowLogPayloads(slowLogPayloads)
.build();
@@ -4015,10 +4029,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
@QosPriority(priority = HConstants.ADMIN_QOS)
public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller,
final ClearSlowLogResponseRequest request) {
- final SlowLogRecorder slowLogRecorder =
- this.regionServer.getSlowLogRecorder();
- boolean slowLogsCleaned = Optional.ofNullable(slowLogRecorder)
- .map(SlowLogRecorder::clearSlowLogPayloads).orElse(false);
+ final NamedQueueRecorder namedQueueRecorder =
+ this.regionServer.getNamedQueueRecorder();
+ boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder)
+ .map(queueRecorder ->
+ queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG))
+ .orElse(false);
ClearSlowLogResponses clearSlowLogResponses = ClearSlowLogResponses.newBuilder()
.setIsCleaned(slowLogsCleaned)
.build();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
deleted file mode 100644
index 9c147e3..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.regionserver.slowlog;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.RingBuffer;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Queue;
-import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.SlowLogParams;
-import org.apache.hadoop.hbase.ipc.RpcCall;
-import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
-import org.apache.hbase.thirdparty.com.google.common.collect.Queues;
-import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors;
-import org.apache.hbase.thirdparty.com.google.protobuf.Message;
-
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload;
-
-/**
- * Event Handler run by disruptor ringbuffer consumer
- */
-@InterfaceAudience.Private
-class LogEventHandler implements EventHandler<RingBufferEnvelope> {
-
- private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
-
- private static final String SYS_TABLE_QUEUE_SIZE =
- "hbase.regionserver.slowlog.systable.queue.size";
- private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
- private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
-
- private final Queue<SlowLogPayload> queueForRingBuffer;
- private final Queue<SlowLogPayload> queueForSysTable;
- private final boolean isSlowLogTableEnabled;
-
- private Configuration configuration;
-
- private static final ReentrantLock LOCK = new ReentrantLock();
-
- LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
- this.configuration = conf;
- EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
- queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
- this.isSlowLogTableEnabled = isSlowLogTableEnabled;
- if (isSlowLogTableEnabled) {
- int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
- EvictingQueue<SlowLogPayload> evictingQueueForTable =
- EvictingQueue.create(sysTableQueueSize);
- queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
- } else {
- queueForSysTable = null;
- }
- }
-
- /**
- * Called when a publisher has published an event to the {@link RingBuffer}
- *
- * @param event published to the {@link RingBuffer}
- * @param sequence of the event being processed
- * @param endOfBatch flag to indicate if this is the last event in a batch from
- * the {@link RingBuffer}
- * @throws Exception if the EventHandler would like the exception handled further up the chain
- */
- @Override
- public void onEvent(RingBufferEnvelope event, long sequence, boolean endOfBatch)
- throws Exception {
- final RpcLogDetails rpcCallDetails = event.getPayload();
- final RpcCall rpcCall = rpcCallDetails.getRpcCall();
- final String clientAddress = rpcCallDetails.getClientAddress();
- final long responseSize = rpcCallDetails.getResponseSize();
- final String className = rpcCallDetails.getClassName();
- final SlowLogPayload.Type type = getLogType(rpcCallDetails);
- if (type == null) {
- return;
- }
- Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
- Message param = rpcCallDetails.getParam();
- long receiveTime = rpcCall.getReceiveTime();
- long startTime = rpcCall.getStartTime();
- long endTime = System.currentTimeMillis();
- int processingTime = (int) (endTime - startTime);
- int qTime = (int) (startTime - receiveTime);
- final SlowLogParams slowLogParams = ProtobufUtil.getSlowLogParams(param);
- int numGets = 0;
- int numMutations = 0;
- int numServiceCalls = 0;
- if (param instanceof ClientProtos.MultiRequest) {
- ClientProtos.MultiRequest multi = (ClientProtos.MultiRequest) param;
- for (ClientProtos.RegionAction regionAction : multi.getRegionActionList()) {
- for (ClientProtos.Action action : regionAction.getActionList()) {
- if (action.hasMutation()) {
- numMutations++;
- }
- if (action.hasGet()) {
- numGets++;
- }
- if (action.hasServiceCall()) {
- numServiceCalls++;
- }
- }
- }
- }
- final String userName = rpcCall.getRequestUserName().orElse(StringUtils.EMPTY);
- final String methodDescriptorName =
- methodDescriptor != null ? methodDescriptor.getName() : StringUtils.EMPTY;
- SlowLogPayload slowLogPayload = SlowLogPayload.newBuilder()
- .setCallDetails(methodDescriptorName + "(" + param.getClass().getName() + ")")
- .setClientAddress(clientAddress)
- .setMethodName(methodDescriptorName)
- .setMultiGets(numGets)
- .setMultiMutations(numMutations)
- .setMultiServiceCalls(numServiceCalls)
- .setParam(slowLogParams != null ? slowLogParams.getParams() : StringUtils.EMPTY)
- .setProcessingTime(processingTime)
- .setQueueTime(qTime)
- .setRegionName(slowLogParams != null ? slowLogParams.getRegionName() : StringUtils.EMPTY)
- .setResponseSize(responseSize)
- .setServerClass(className)
- .setStartTime(startTime)
- .setType(type)
- .setUserName(userName)
- .build();
- queueForRingBuffer.add(slowLogPayload);
- if (isSlowLogTableEnabled) {
- if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
- queueForSysTable.add(slowLogPayload);
- }
- }
- }
-
- private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
- final boolean isSlowLog = rpcCallDetails.isSlowLog();
- final boolean isLargeLog = rpcCallDetails.isLargeLog();
- final SlowLogPayload.Type type;
- if (!isSlowLog && !isLargeLog) {
- LOG.error("slowLog and largeLog both are false. Ignoring the event. rpcCallDetails: {}",
- rpcCallDetails);
- return null;
- }
- if (isSlowLog && isLargeLog) {
- type = SlowLogPayload.Type.ALL;
- } else if (isSlowLog) {
- type = SlowLogPayload.Type.SLOW_LOG;
- } else {
- type = SlowLogPayload.Type.LARGE_LOG;
- }
- return type;
- }
-
- /**
- * Cleans up slow log payloads
- *
- * @return true if slow log payloads are cleaned up, false otherwise
- */
- boolean clearSlowLogs() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Received request to clean up online slowlog buffer..");
- }
- queueForRingBuffer.clear();
- return true;
- }
-
- /**
- * Retrieve list of slow log payloads
- *
- * @param request slow log request parameters
- * @return list of slow log payloads
- */
- List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
- List<SlowLogPayload> slowLogPayloadList =
- Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
- .filter(e -> e.getType() == SlowLogPayload.Type.ALL
- || e.getType() == SlowLogPayload.Type.SLOW_LOG)
- .collect(Collectors.toList());
-
- // latest slow logs first, operator is interested in latest records from in-memory buffer
- Collections.reverse(slowLogPayloadList);
-
- return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
- }
-
- /**
- * Retrieve list of large log payloads
- *
- * @param request large log request parameters
- * @return list of large log payloads
- */
- List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
- List<SlowLogPayload> slowLogPayloadList =
- Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
- .filter(e -> e.getType() == SlowLogPayload.Type.ALL
- || e.getType() == SlowLogPayload.Type.LARGE_LOG)
- .collect(Collectors.toList());
-
- // latest large logs first, operator is interested in latest records from in-memory buffer
- Collections.reverse(slowLogPayloadList);
-
- return LogHandlerUtils.getFilteredLogs(request, slowLogPayloadList);
- }
-
- /**
- * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
- */
- void addAllLogsToSysTable() {
- if (queueForSysTable == null) {
- // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
- return;
- }
- if (LOCK.isLocked()) {
- return;
- }
- LOCK.lock();
- try {
- List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
- int i = 0;
- while (!queueForSysTable.isEmpty()) {
- slowLogPayloads.add(queueForSysTable.poll());
- i++;
- if (i == SYSTABLE_PUT_BATCH_SIZE) {
- SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
- slowLogPayloads.clear();
- i = 0;
- }
- }
- if (slowLogPayloads.size() > 0) {
- SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
- }
- } finally {
- LOCK.unlock();
- }
- }
-
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
similarity index 76%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
index f90bbc0..542efc3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestNamedQueueRecorder.java
@@ -17,12 +17,14 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
+import java.lang.reflect.Constructor;
import java.net.InetAddress;
import java.security.PrivilegedAction;
import java.security.PrivilegedExceptionAction;
+import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -35,6 +37,8 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcCallback;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -61,27 +65,25 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPaylo
* Tests for Online SlowLog Provider Service
*/
@Category({MasterTests.class, MediumTests.class})
-public class TestSlowLogRecorder {
+public class TestNamedQueueRecorder {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestSlowLogRecorder.class);
+ HBaseClassTestRule.forClass(TestNamedQueueRecorder.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
- private SlowLogRecorder slowLogRecorder;
+ private NamedQueueRecorder namedQueueRecorder;
private static int i = 0;
private static Configuration applySlowLogRecorderConf(int eventSize) {
-
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", eventSize);
return conf;
-
}
/**
@@ -90,11 +92,10 @@ public class TestSlowLogRecorder {
*
* @param i index of ringbuffer logs
* @param j data value that was put on index i
- * @param slowLogPayloads list of payload retrieved from {@link SlowLogRecorder}
+ * @param slowLogPayloads list of payload retrieved from {@link NamedQueueRecorder}
* @return if actual values are as per expectations
*/
private boolean confirmPayloadParams(int i, int j, List<SlowLogPayload> slowLogPayloads) {
-
boolean isClientExpected = slowLogPayloads.get(i).getClientAddress().equals("client_" + j);
boolean isUserExpected = slowLogPayloads.get(i).getUserName().equals("userName_" + j);
boolean isClassExpected = slowLogPayloads.get(i).getServerClass().equals("class_" + j);
@@ -102,15 +103,18 @@ public class TestSlowLogRecorder {
}
@Test
- public void testOnlieSlowLogConsumption() throws Exception {
+ public void testOnlieSlowLogConsumption() throws Exception{
Configuration conf = applySlowLogRecorderConf(8);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
- slowLogRecorder.clearSlowLogPayloads();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
int i = 0;
@@ -119,12 +123,12 @@ public class TestSlowLogRecorder {
for (; i < 5; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
- Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 5));
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ Assert.assertNotEquals(-1,
+ HBASE_TESTING_UTILITY.waitFor(3000, () -> getSlowLogPayloads(request).size() == 5));
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
Assert.assertTrue(confirmPayloadParams(0, 5, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(1, 4, slowLogPayloads));
Assert.assertTrue(confirmPayloadParams(2, 3, slowLogPayloads));
@@ -135,15 +139,15 @@ public class TestSlowLogRecorder {
for (; i < 7; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 7));
+ () -> getSlowLogPayloads(request).size() == 7));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
return slowLogPayloadsList.size() == 7
&& confirmPayloadParams(0, 7, slowLogPayloadsList)
&& confirmPayloadParams(5, 2, slowLogPayloadsList)
@@ -155,15 +159,15 @@ public class TestSlowLogRecorder {
for (; i < 10; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+ () -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
return slowLogPayloadsList.size() == 8
&& confirmPayloadParams(7, 3, slowLogPayloadsList)
@@ -176,15 +180,15 @@ public class TestSlowLogRecorder {
for (; i < 14; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 8));
+ () -> getSlowLogPayloads(request).size() == 8));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
@@ -195,9 +199,14 @@ public class TestSlowLogRecorder {
})
);
+ AdminProtos.SlowLogResponseRequest largeLogRequest =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(15)
+ .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+ .build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getLargeLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(largeLogRequest);
// confirm ringbuffer is full
// and ordered events
return slowLogPayloadsList.size() == 8
@@ -210,11 +219,12 @@ public class TestSlowLogRecorder {
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+ boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
+ NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
- List<SlowLogPayload> slowLogPayloadsList = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloadsList = getSlowLogPayloads(request);
// confirm ringbuffer is empty
return slowLogPayloadsList.size() == 0 && isRingBufferCleaned;
})
@@ -222,30 +232,43 @@ public class TestSlowLogRecorder {
}
+ private List<SlowLogPayload> getSlowLogPayloads(AdminProtos.SlowLogResponseRequest request) {
+ NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setSlowLogResponseRequest(request);
+ NamedQueueGetResponse namedQueueGetResponse =
+ namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ return namedQueueGetResponse == null ?
+ Collections.emptyList() : namedQueueGetResponse.getSlowLogPayloads();
+ }
+
@Test
public void testOnlineSlowLogWithHighRecords() throws Exception {
Configuration conf = applySlowLogRecorderConf(14);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 14 * 11; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+ () -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@@ -266,37 +289,37 @@ public class TestSlowLogRecorder {
})
);
- boolean isRingBufferCleaned = slowLogRecorder.clearSlowLogPayloads();
+ boolean isRingBufferCleaned = namedQueueRecorder.clearNamedQueue(
+ NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
Assert.assertTrue(isRingBufferCleaned);
LOG.debug("cleared the ringbuffer of Online Slow Log records");
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm ringbuffer is empty
Assert.assertEquals(slowLogPayloads.size(), 0);
-
}
@Test
public void testOnlineSlowLogWithDefaultDisableConfig() throws Exception {
-
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.unset(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
-
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
-
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
@@ -305,56 +328,58 @@ public class TestSlowLogRecorder {
@Test
public void testOnlineSlowLogWithDisableConfig() throws Exception {
-
Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, false);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
- slowLogRecorder = new SlowLogRecorder(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
-
for (int i = 0; i < 300; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
-
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
return slowLogPayloads.size() == 0;
})
);
conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
-
}
@Test
public void testSlowLogFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_87")
.build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 100 records, ringbuffer should only 1 record with matching filter");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 1));
+ () -> getSlowLogPayloads(request).size() == 1));
AdminProtos.SlowLogResponseRequest requestClient =
AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -362,25 +387,32 @@ public class TestSlowLogRecorder {
.setClientAddress("client_85")
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(requestClient).size() == 1));
+ () -> getSlowLogPayloads(requestClient).size() == 1));
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
-
+ () -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
@Test
public void testConcurrentSlowLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(50000);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ AdminProtos.SlowLogResponseRequest largeLogRequest =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(500000)
+ .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+ .build();
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
for (int j = 0; j < 1000; j++) {
@@ -389,7 +421,7 @@ public class TestSlowLogRecorder {
for (int i = 0; i < 3500; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
});
@@ -397,22 +429,24 @@ public class TestSlowLogRecorder {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
- slowLogRecorder.clearSlowLogPayloads();
-
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
- 5000, () -> slowLogRecorder.getSlowLogPayloads(request).size() > 10000));
+ 5000, () -> getSlowLogPayloads(request).size() > 10000));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(
- 5000, () -> slowLogRecorder.getLargeLogPayloads(request).size() > 10000));
+ 5000, () -> getSlowLogPayloads(largeLogRequest).size() > 10000));
}
@Test
public void testSlowLargeLogEvents() throws Exception {
Configuration conf = applySlowLogRecorderConf(28);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
+
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(14 * 11).build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
LOG.debug("Initially ringbuffer of Slow Log records is empty");
boolean isSlowLog;
@@ -428,16 +462,16 @@ public class TestSlowLogRecorder {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1),
isSlowLog, isLargeLog);
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
LOG.debug("Added 14 * 11 records, ringbuffer should only provide latest 14 records");
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+ () -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> slowLogPayloads = slowLogRecorder.getSlowLogPayloads(request);
+ List<SlowLogPayload> slowLogPayloads = getSlowLogPayloads(request);
// confirm strict order of slow log payloads
return slowLogPayloads.size() == 14
@@ -458,12 +492,18 @@ public class TestSlowLogRecorder {
})
);
+ AdminProtos.SlowLogResponseRequest largeLogRequest =
+ AdminProtos.SlowLogResponseRequest.newBuilder()
+ .setLimit(14 * 11)
+ .setLogType(AdminProtos.SlowLogResponseRequest.LogType.LARGE_LOG)
+ .build();
+
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getLargeLogPayloads(request).size() == 14));
+ () -> getSlowLogPayloads(largeLogRequest).size() == 14));
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
() -> {
- List<SlowLogPayload> largeLogPayloads = slowLogRecorder.getLargeLogPayloads(request);
+ List<SlowLogPayload> largeLogPayloads = getSlowLogPayloads(largeLogRequest);
// confirm strict order of slow log payloads
return largeLogPayloads.size() == 14
@@ -483,14 +523,16 @@ public class TestSlowLogRecorder {
&& confirmPayloadParams(13, 128, largeLogPayloads);
})
);
-
}
@Test
public void testSlowLogMixedFilters() throws Exception {
Configuration conf = applySlowLogRecorderConf(30);
- slowLogRecorder = new SlowLogRecorder(conf);
+ Constructor<NamedQueueRecorder> constructor =
+ NamedQueueRecorder.class.getDeclaredConstructor(Configuration.class);
+ constructor.setAccessible(true);
+ namedQueueRecorder = constructor.newInstance(conf);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
@@ -498,23 +540,23 @@ public class TestSlowLogRecorder {
.setClientAddress("client_88")
.build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
for (int i = 0; i < 100; i++) {
RpcLogDetails rpcLogDetails =
getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(request).size() == 2));
+ () -> getSlowLogPayloads(request).size() == 2));
AdminProtos.SlowLogResponseRequest request2 = AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.setUserName("userName_1")
.setClientAddress("client_2")
.build();
- Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request2).size());
+ Assert.assertEquals(0, getSlowLogPayloads(request2).size());
AdminProtos.SlowLogResponseRequest request3 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -523,7 +565,7 @@ public class TestSlowLogRecorder {
.setClientAddress("client_88")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
- Assert.assertEquals(0, slowLogRecorder.getSlowLogPayloads(request3).size());
+ Assert.assertEquals(0, getSlowLogPayloads(request3).size());
AdminProtos.SlowLogResponseRequest request4 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -532,7 +574,7 @@ public class TestSlowLogRecorder {
.setClientAddress("client_87")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.AND)
.build();
- Assert.assertEquals(1, slowLogRecorder.getSlowLogPayloads(request4).size());
+ Assert.assertEquals(1, getSlowLogPayloads(request4).size());
AdminProtos.SlowLogResponseRequest request5 =
AdminProtos.SlowLogResponseRequest.newBuilder()
@@ -541,14 +583,14 @@ public class TestSlowLogRecorder {
.setClientAddress("client_89")
.setFilterByOperator(AdminProtos.SlowLogResponseRequest.FilterByOperator.OR)
.build();
- Assert.assertEquals(2, slowLogRecorder.getSlowLogPayloads(request5).size());
+ Assert.assertEquals(2, getSlowLogPayloads(request5).size());
AdminProtos.SlowLogResponseRequest requestSlowLog =
AdminProtos.SlowLogResponseRequest.newBuilder()
.setLimit(15)
.build();
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(3000,
- () -> slowLogRecorder.getSlowLogPayloads(requestSlowLog).size() == 15));
+ () -> getSlowLogPayloads(requestSlowLog).size() == 15));
}
static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
@@ -762,26 +804,20 @@ public class TestSlowLogRecorder {
private static Optional<User> getUser(String userName) {
return Optional.of(new User() {
-
-
@Override
public String getShortName() {
return userName;
}
-
@Override
public <T> T runAs(PrivilegedAction<T> action) {
return null;
}
-
@Override
- public <T> T runAs(PrivilegedExceptionAction<T> action) throws
- IOException, InterruptedException {
+ public <T> T runAs(PrivilegedExceptionAction<T> action) {
return null;
}
-
});
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
similarity index 76%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
index e08ad29..a914796 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestSlowLogAccessor.java
@@ -17,10 +17,11 @@
* limitations under the License.
*/
-package org.apache.hadoop.hbase.regionserver.slowlog;
+package org.apache.hadoop.hbase.namequeues;
import java.io.IOException;
import java.lang.reflect.Field;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
@@ -33,8 +34,11 @@ import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.hadoop.hbase.testclassification.MasterTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -59,11 +63,11 @@ public class TestSlowLogAccessor {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
- private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
- private SlowLogRecorder slowLogRecorder;
+ private NamedQueueRecorder namedQueueRecorder;
@BeforeClass
public static void setup() throws Exception {
@@ -88,9 +92,19 @@ public class TestSlowLogAccessor {
@Before
public void setUp() throws Exception {
HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
- Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
+ Field slowLogRecorder = HRegionServer.class.getDeclaredField("namedQueueRecorder");
slowLogRecorder.setAccessible(true);
- this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
+ this.namedQueueRecorder = (NamedQueueRecorder) slowLogRecorder.get(hRegionServer);
+ }
+
+ private List<TooSlowLog.SlowLogPayload> getSlowLogPayloads(
+ AdminProtos.SlowLogResponseRequest request) {
+ NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest();
+ namedQueueGetRequest.setNamedQueueEvent(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ namedQueueGetRequest.setSlowLogResponseRequest(request);
+ NamedQueueGetResponse namedQueueGetResponse =
+ namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest);
+ return namedQueueGetResponse.getSlowLogPayloads();
}
@Test
@@ -99,42 +113,42 @@ public class TestSlowLogAccessor {
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
- slowLogRecorder.clearSlowLogPayloads();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
int i = 0;
Connection connection = waitForSlowLogTableCreation();
// add 5 records initially
for (; i < 5; i++) {
- RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 2 more records
for (; i < 7; i++) {
- RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 3 more records
for (; i < 10; i++) {
- RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
// add 4 more records
for (; i < 14; i++) {
- RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
- .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+ .waitFor(3000, () -> getSlowLogPayloads(request).size() == 14));
Assert.assertNotEquals(-1,
HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
@@ -170,10 +184,10 @@ public class TestSlowLogAccessor {
public void testHigherSlowLogs() throws Exception {
Connection connection = waitForSlowLogTableCreation();
- slowLogRecorder.clearSlowLogPayloads();
+ namedQueueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
AdminProtos.SlowLogResponseRequest request =
AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
- Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+ Assert.assertEquals(getSlowLogPayloads(request).size(), 0);
for (int j = 0; j < 100; j++) {
CompletableFuture.runAsync(() -> {
@@ -181,15 +195,15 @@ public class TestSlowLogAccessor {
if (i == 300) {
Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
}
- RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ RpcLogDetails rpcLogDetails = TestNamedQueueRecorder
.getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
- slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ namedQueueRecorder.addRecord(rpcLogDetails);
}
});
}
Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
- int count = slowLogRecorder.getSlowLogPayloads(request).size();
+ int count = getSlowLogPayloads(request).size();
LOG.debug("RingBuffer records count: {}", count);
return count > 2000;
}));
diff --git a/hbase-shell/src/test/ruby/hbase/admin_test.rb b/hbase-shell/src/test/ruby/hbase/admin_test.rb
index 79d47c5..3f89383 100644
--- a/hbase-shell/src/test/ruby/hbase/admin_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/admin_test.rb
@@ -394,7 +394,7 @@ module Hbase
define_test 'clear slowlog responses should work' do
output = capture_stdout { command(:clear_slowlog_responses, nil) }
- assert(output.include?('Cleared Slowlog responses from 1/1 RegionServers'))
+ assert(output.include?('Cleared Slowlog responses from 0/1 RegionServers'))
end
#-------------------------------------------------------------------------------
diff --git a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
index 5f2b695..59ae6d3 100644
--- a/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
+++ b/hbase-thrift/src/test/java/org/apache/hadoop/hbase/thrift2/TestThriftHBaseServiceHandler.java
@@ -197,6 +197,7 @@ public class TestThriftHBaseServiceHandler {
@BeforeClass
public static void beforeClass() throws Exception {
UTIL.getConfiguration().set("hbase.client.retries.number", "3");
+ UTIL.getConfiguration().setBoolean("hbase.regionserver.slowlog.buffer.enabled", true);
UTIL.startMiniCluster();
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf(tableAname));
for (HColumnDescriptor family : families) {