You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/05/20 10:01:39 UTC
[hbase] branch branch-2 updated: HBASE-23938 : System table hbase:slowlog to store complete slow/large… (#1681)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 6f5e5e4 HBASE-23938 : System table hbase:slowlog to store complete slow/large… (#1681)
6f5e5e4 is described below
commit 6f5e5e4828e965994ab20311979002d708bfc59f
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed May 20 15:10:29 2020 +0530
HBASE-23938 : System table hbase:slowlog to store complete slow/large… (#1681)
Signed-off-by: Duo Zhang <zh...@apache.org>
Signed-off-by: Anoop Sam John <an...@apache.org>
Signed-off-by: ramkrish86 <ra...@apache.org>
---
.../hadoop/hbase/slowlog/SlowLogTableAccessor.java | 150 +++++++++++++++
.../java/org/apache/hadoop/hbase/HConstants.java | 10 +
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 2 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 3 +
.../hbase/master/slowlog/SlowLogMasterService.java | 73 ++++++++
.../hadoop/hbase/regionserver/HRegionServer.java | 15 ++
.../regionserver/slowlog/LogEventHandler.java | 80 +++++++-
.../hbase/regionserver/slowlog/RpcLogDetails.java | 11 +-
.../regionserver/slowlog/SlowLogRecorder.java | 14 +-
.../regionserver/slowlog/SlowLogTableOpsChore.java | 63 +++++++
.../regionserver/slowlog/TestSlowLogAccessor.java | 204 +++++++++++++++++++++
.../regionserver/slowlog/TestSlowLogRecorder.java | 15 +-
12 files changed, 619 insertions(+), 21 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
new file mode 100644
index 0000000..f4f29c6
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.slowlog;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.NamespaceDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Slowlog Accessor to record slow/large RPC log identified at each RegionServer RpcServer level.
+ * This can be done only optionally to record the entire history of slow/large rpc calls
+ * since RingBuffer can handle only limited latest records.
+ */
+@InterfaceAudience.Private
+public class SlowLogTableAccessor {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
+
+ private static final Random RANDOM = new Random();
+
+ private static Connection connection;
+
+ /**
+ * hbase:slowlog table name - can be enabled
+ * with config - hbase.regionserver.slowlog.systable.enabled
+ */
+ public static final TableName SLOW_LOG_TABLE_NAME =
+ TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "slowlog");
+
+ private static void doPut(final Connection connection, final List<Put> puts)
+ throws IOException {
+ try (Table table = connection.getTable(SLOW_LOG_TABLE_NAME)) {
+ table.put(puts);
+ }
+ }
+
+ /**
+ * Add slow/large log records to hbase:slowlog table
+ * @param slowLogPayloads List of SlowLogPayload to process
+ * @param configuration Configuration to use for connection
+ */
+ public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
+ final Configuration configuration) {
+ List<Put> puts = new ArrayList<>(slowLogPayloads.size());
+ for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
+ final byte[] rowKey = getRowKey(slowLogPayload);
+ final Put put = new Put(rowKey).setDurability(Durability.SKIP_WAL)
+ .setPriority(HConstants.NORMAL_QOS)
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("call_details"),
+ Bytes.toBytes(slowLogPayload.getCallDetails()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("client_address"),
+ Bytes.toBytes(slowLogPayload.getClientAddress()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("method_name"),
+ Bytes.toBytes(slowLogPayload.getMethodName()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("param"),
+ Bytes.toBytes(slowLogPayload.getParam()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("processing_time"),
+ Bytes.toBytes(Integer.toString(slowLogPayload.getProcessingTime())))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("queue_time"),
+ Bytes.toBytes(Integer.toString(slowLogPayload.getQueueTime())))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("region_name"),
+ Bytes.toBytes(slowLogPayload.getRegionName()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("response_size"),
+ Bytes.toBytes(Long.toString(slowLogPayload.getResponseSize())))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("server_class"),
+ Bytes.toBytes(slowLogPayload.getServerClass()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("start_time"),
+ Bytes.toBytes(Long.toString(slowLogPayload.getStartTime())))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("type"),
+ Bytes.toBytes(slowLogPayload.getType().name()))
+ .addColumn(HConstants.SLOWLOG_INFO_FAMILY, Bytes.toBytes("username"),
+ Bytes.toBytes(slowLogPayload.getUserName()));
+ puts.add(put);
+ }
+ try {
+ if (connection == null) {
+ createConnection(configuration);
+ }
+ doPut(connection, puts);
+ } catch (Exception e) {
+ LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
+ }
+ }
+
+ private static synchronized void createConnection(Configuration configuration)
+ throws IOException {
+ Configuration conf = new Configuration(configuration);
+ // rpc timeout: 20s
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
+ // retry count: 5
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+ conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+ connection = ConnectionFactory.createConnection(conf);
+ }
+
+ /**
+ * Create rowKey: currentTimeMillis APPEND slowLogPayload.hashcode
+ * Scan on slowlog table should keep records with sorted order of time, however records
+ * added at the very same time (currentTimeMillis) could be in random order.
+ *
+ * @param slowLogPayload SlowLogPayload to process
+ * @return rowKey byte[]
+ */
+ private static byte[] getRowKey(final TooSlowLog.SlowLogPayload slowLogPayload) {
+ String hashcode = String.valueOf(slowLogPayload.hashCode());
+ String lastFiveDig =
+ hashcode.substring((hashcode.length() > 5) ? (hashcode.length() - 5) : 0);
+ if (lastFiveDig.startsWith("-")) {
+ lastFiveDig = String.valueOf(RANDOM.nextInt(99999));
+ }
+ final long currentTimeMillis = EnvironmentEdgeManager.currentTime();
+ final String timeAndHashcode = currentTimeMillis + lastFiveDig;
+ final long rowKeyLong = Long.parseLong(timeAndHashcode);
+ return Bytes.toBytes(rowKeyLong);
+ }
+
+}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index fdc3532..8c3d295 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1563,6 +1563,16 @@ public final class HConstants {
"hbase.regionserver.slowlog.buffer.enabled";
public static final boolean DEFAULT_ONLINE_LOG_PROVIDER_ENABLED = false;
+ /** The slowlog info family as a string*/
+ private static final String SLOWLOG_INFO_FAMILY_STR = "info";
+
+ /** The slowlog info family */
+ public static final byte [] SLOWLOG_INFO_FAMILY = Bytes.toBytes(SLOWLOG_INFO_FAMILY_STR);
+
+ public static final String SLOW_LOG_SYS_TABLE_ENABLED_KEY =
+ "hbase.regionserver.slowlog.systable.enabled";
+ public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d49122a..3a59a4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -435,7 +435,7 @@ public abstract class RpcServer implements RpcServerInterface,
final String className = server == null ? StringUtils.EMPTY :
server.getClass().getSimpleName();
this.slowLogRecorder.addSlowLogPayload(
- new RpcLogDetails(call, status.getClient(), responseSize, className, tooSlow,
+ new RpcLogDetails(call, param, status.getClient(), responseSize, className, tooSlow,
tooLarge));
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 482e9bb..26f0092 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -143,6 +143,7 @@ import org.apache.hadoop.hbase.master.replication.ModifyPeerProcedure;
import org.apache.hadoop.hbase.master.replication.RemovePeerProcedure;
import org.apache.hadoop.hbase.master.replication.ReplicationPeerManager;
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
+import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
@@ -1139,6 +1140,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// Start the chore to read snapshots and add their usage to table/NS quotas
getChoreService().scheduleChore(snapshotQuotaChore);
}
+ final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
+ slowLogMasterService.init();
// clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java
new file mode 100644
index 0000000..554ed88
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/slowlog/SlowLogMasterService.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.master.slowlog;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Slowlog Master services - Table creation to be used by HMaster
+ */
+@InterfaceAudience.Private
+public class SlowLogMasterService {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogMasterService.class);
+
+ private final boolean slowlogTableEnabled;
+ private final MasterServices masterServices;
+
+ private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER =
+ TableDescriptorBuilder.newBuilder(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)
+ .setRegionReplication(1)
+ .setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(HConstants.SLOWLOG_INFO_FAMILY)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL)
+ .setBlockCacheEnabled(false)
+ .setMaxVersions(1).build());
+
+ public SlowLogMasterService(final Configuration configuration,
+ final MasterServices masterServices) {
+ slowlogTableEnabled = configuration.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+ HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+ this.masterServices = masterServices;
+ }
+
+ public void init() throws IOException {
+ if (!slowlogTableEnabled) {
+ LOG.info("Slow/Large requests logging to system table hbase:slowlog is disabled. Quitting.");
+ return;
+ }
+ if (!MetaTableAccessor.tableExists(masterServices.getConnection(),
+ SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
+ LOG.info("slowlog table not found. Creating.");
+ this.masterServices.createSystemTable(TABLE_DESCRIPTOR_BUILDER.build());
+ }
+ }
+
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c76055e..cd0cc3a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -138,6 +138,7 @@ import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
import org.apache.hadoop.hbase.regionserver.handler.RegionReplicaFlushHandler;
import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogRecorder;
+import org.apache.hadoop.hbase.regionserver.slowlog.SlowLogTableOpsChore;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
@@ -427,6 +428,8 @@ public class HRegionServer extends HasThread implements
private final RegionServerAccounting regionServerAccounting;
+ private SlowLogTableOpsChore slowLogTableOpsChore = null;
+
// Block cache
private BlockCache blockCache;
// The cache for mob files
@@ -2011,6 +2014,9 @@ public class HRegionServer extends HasThread implements
if (this.storefileRefresher != null) choreService.scheduleChore(storefileRefresher);
if (this.movedRegionsCleaner != null) choreService.scheduleChore(movedRegionsCleaner);
if (this.fsUtilizationChore != null) choreService.scheduleChore(fsUtilizationChore);
+ if (this.slowLogTableOpsChore != null) {
+ choreService.scheduleChore(slowLogTableOpsChore);
+ }
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
@@ -2056,6 +2062,14 @@ public class HRegionServer extends HasThread implements
this.periodicFlusher = new PeriodicMemStoreFlusher(this.flushCheckFrequency, this);
this.leaseManager = new LeaseManager(this.threadWakeFrequency);
+ final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+ HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+ if (isSlowLogTableEnabled) {
+ // default chore duration: 10 min
+ final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
+ slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.slowLogRecorder);
+ }
+
// Create the thread to clean the moved regions list
movedRegionsCleaner = MovedRegionsCleaner.create(this);
@@ -2559,6 +2573,7 @@ public class HRegionServer extends HasThread implements
choreService.cancelChore(storefileRefresher);
choreService.cancelChore(movedRegionsCleaner);
choreService.cancelChore(fsUtilizationChore);
+ choreService.cancelChore(slowLogTableOpsChore);
// clean up the remaining scheduled chores (in case we missed out any)
choreService.shutdown();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
index 508f086..8d500de 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/LogEventHandler.java
@@ -27,11 +27,14 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -54,11 +57,32 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
private static final Logger LOG = LoggerFactory.getLogger(LogEventHandler.class);
- private final Queue<SlowLogPayload> queue;
+ private static final String SYS_TABLE_QUEUE_SIZE =
+ "hbase.regionserver.slowlog.systable.queue.size";
+ private static final int DEFAULT_SYS_TABLE_QUEUE_SIZE = 1000;
+ private static final int SYSTABLE_PUT_BATCH_SIZE = 100;
- LogEventHandler(int eventCount) {
+ private final Queue<SlowLogPayload> queueForRingBuffer;
+ private final Queue<SlowLogPayload> queueForSysTable;
+ private final boolean isSlowLogTableEnabled;
+
+ private Configuration configuration;
+
+ private static final ReentrantLock LOCK = new ReentrantLock();
+
+ LogEventHandler(int eventCount, boolean isSlowLogTableEnabled, Configuration conf) {
+ this.configuration = conf;
EvictingQueue<SlowLogPayload> evictingQueue = EvictingQueue.create(eventCount);
- queue = Queues.synchronizedQueue(evictingQueue);
+ queueForRingBuffer = Queues.synchronizedQueue(evictingQueue);
+ this.isSlowLogTableEnabled = isSlowLogTableEnabled;
+ if (isSlowLogTableEnabled) {
+ int sysTableQueueSize = conf.getInt(SYS_TABLE_QUEUE_SIZE, DEFAULT_SYS_TABLE_QUEUE_SIZE);
+ EvictingQueue<SlowLogPayload> evictingQueueForTable =
+ EvictingQueue.create(sysTableQueueSize);
+ queueForSysTable = Queues.synchronizedQueue(evictingQueueForTable);
+ } else {
+ queueForSysTable = null;
+ }
}
/**
@@ -83,7 +107,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
return;
}
Descriptors.MethodDescriptor methodDescriptor = rpcCall.getMethod();
- Message param = rpcCall.getParam();
+ Message param = rpcCallDetails.getParam();
long receiveTime = rpcCall.getReceiveTime();
long startTime = rpcCall.getStartTime();
long endTime = System.currentTimeMillis();
@@ -129,7 +153,12 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
.setType(type)
.setUserName(userName)
.build();
- queue.add(slowLogPayload);
+ queueForRingBuffer.add(slowLogPayload);
+ if (isSlowLogTableEnabled) {
+ if (!slowLogPayload.getRegionName().startsWith("hbase:slowlog")) {
+ queueForSysTable.add(slowLogPayload);
+ }
+ }
}
private SlowLogPayload.Type getLogType(RpcLogDetails rpcCallDetails) {
@@ -160,7 +189,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
if (LOG.isDebugEnabled()) {
LOG.debug("Received request to clean up online slowlog buffer..");
}
- queue.clear();
+ queueForRingBuffer.clear();
return true;
}
@@ -172,7 +201,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
*/
List<SlowLogPayload> getSlowLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
- Arrays.stream(queue.toArray(new SlowLogPayload[0]))
+ Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.SLOW_LOG)
.collect(Collectors.toList());
@@ -191,7 +220,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
*/
List<SlowLogPayload> getLargeLogPayloads(final AdminProtos.SlowLogResponseRequest request) {
List<SlowLogPayload> slowLogPayloadList =
- Arrays.stream(queue.toArray(new SlowLogPayload[0]))
+ Arrays.stream(queueForRingBuffer.toArray(new SlowLogPayload[0]))
.filter(e -> e.getType() == SlowLogPayload.Type.ALL
|| e.getType() == SlowLogPayload.Type.LARGE_LOG)
.collect(Collectors.toList());
@@ -207,8 +236,7 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
if (isFilterProvided(request)) {
logPayloadList = filterLogs(request, logPayloadList);
}
- int limit = request.getLimit() >= logPayloadList.size() ? logPayloadList.size()
- : request.getLimit();
+ int limit = Math.min(request.getLimit(), logPayloadList.size());
return logPayloadList.subList(0, limit);
}
@@ -256,4 +284,36 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
return filteredSlowLogPayloads;
}
+ /**
+ * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
+ */
+ void addAllLogsToSysTable() {
+ if (queueForSysTable == null) {
+ // hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.
+ return;
+ }
+ if (LOCK.isLocked()) {
+ return;
+ }
+ LOCK.lock();
+ try {
+ List<SlowLogPayload> slowLogPayloads = new ArrayList<>();
+ int i = 0;
+ while (!queueForSysTable.isEmpty()) {
+ slowLogPayloads.add(queueForSysTable.poll());
+ i++;
+ if (i == SYSTABLE_PUT_BATCH_SIZE) {
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ slowLogPayloads.clear();
+ i = 0;
+ }
+ }
+ if (slowLogPayloads.size() > 0) {
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ }
+ } finally {
+ LOCK.unlock();
+ }
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
index 7d5558c..b469cdb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/RpcLogDetails.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hbase.regionserver.slowlog;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.yetus.audience.InterfaceAudience;
/**
@@ -30,15 +31,17 @@ import org.apache.yetus.audience.InterfaceAudience;
public class RpcLogDetails {
private final RpcCall rpcCall;
+ private final Message param;
private final String clientAddress;
private final long responseSize;
private final String className;
private final boolean isSlowLog;
private final boolean isLargeLog;
- public RpcLogDetails(RpcCall rpcCall, String clientAddress, long responseSize,
+ public RpcLogDetails(RpcCall rpcCall, Message param, String clientAddress, long responseSize,
String className, boolean isSlowLog, boolean isLargeLog) {
this.rpcCall = rpcCall;
+ this.param = param;
this.clientAddress = clientAddress;
this.responseSize = responseSize;
this.className = className;
@@ -70,10 +73,15 @@ public class RpcLogDetails {
return isLargeLog;
}
+ public Message getParam() {
+ return param;
+ }
+
@Override
public String toString() {
return new ToStringBuilder(this)
.append("rpcCall", rpcCall)
+ .append("param", param)
.append("clientAddress", clientAddress)
.append("responseSize", responseSize)
.append("className", className)
@@ -81,5 +89,4 @@ public class RpcLogDetails {
.append("isLargeLog", isLargeLog)
.toString();
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
index a69b0ad..b0fb3e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogRecorder.java
@@ -29,6 +29,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -86,7 +87,9 @@ public class SlowLogRecorder {
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
// initialize ringbuffer event handler
- this.logEventHandler = new LogEventHandler(this.eventCount);
+ final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
+ HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
+ this.logEventHandler = new LogEventHandler(this.eventCount, isSlowLogTableEnabled, conf);
this.disruptor.handleEventsWith(new LogEventHandler[]{this.logEventHandler});
this.disruptor.start();
}
@@ -161,4 +164,13 @@ public class SlowLogRecorder {
}
}
+ /**
+ * Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
+ */
+ public void addAllLogsToSysTable() {
+ if (this.logEventHandler != null) {
+ this.logEventHandler.addAllLogsToSysTable();
+ }
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
new file mode 100644
index 0000000..77749f7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/slowlog/SlowLogTableOpsChore.java
@@ -0,0 +1,63 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table
+ */
+@InterfaceAudience.Private
+public class SlowLogTableOpsChore extends ScheduledChore {
+
+ private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
+
+ private final SlowLogRecorder slowLogRecorder;
+
+ /**
+ * Chore Constructor
+ *
+ * @param stopper The stopper - When {@link Stoppable#isStopped()} is true, this chore will
+ * cancel and cleanup
+ * @param period Period in millis with which this Chore repeats execution when scheduled
+ * @param slowLogRecorder {@link SlowLogRecorder} instance
+ */
+ public SlowLogTableOpsChore(final Stoppable stopper, final int period,
+ final SlowLogRecorder slowLogRecorder) {
+ super("SlowLogTableOpsChore", stopper, period);
+ this.slowLogRecorder = slowLogRecorder;
+ }
+
+ @Override
+ protected void chore() {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("SlowLog Table Ops Chore is starting up.");
+ }
+ slowLogRecorder.addAllLogsToSysTable();
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("SlowLog Table Ops Chore is closing.");
+ }
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
new file mode 100644
index 0000000..e08ad29
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogAccessor.java
@@ -0,0 +1,204 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.regionserver.slowlog;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MetaTableAccessor;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
+import org.apache.hadoop.hbase.testclassification.MasterTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests for SlowLog System Table
+ */
+@Category({ MasterTests.class, MediumTests.class })
+public class TestSlowLogAccessor {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestSlowLogAccessor.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestSlowLogRecorder.class);
+
+ private static final HBaseTestingUtility HBASE_TESTING_UTILITY = new HBaseTestingUtility();
+
+ private SlowLogRecorder slowLogRecorder;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ try {
+ HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
+ } catch (IOException e) {
+ LOG.debug("No worries.");
+ }
+ Configuration conf = HBASE_TESTING_UTILITY.getConfiguration();
+ conf.setBoolean(HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, true);
+ conf.setBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY, true);
+ conf.setInt("hbase.slowlog.systable.chore.duration", 900);
+ conf.setInt("hbase.regionserver.slowlog.ringbuffer.size", 50000);
+ HBASE_TESTING_UTILITY.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ HBASE_TESTING_UTILITY.shutdownMiniHBaseCluster();
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ HRegionServer hRegionServer = HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0);
+ Field slowLogRecorder = HRegionServer.class.getDeclaredField("slowLogRecorder");
+ slowLogRecorder.setAccessible(true);
+ this.slowLogRecorder = (SlowLogRecorder) slowLogRecorder.get(hRegionServer);
+ }
+
+ @Test
+ public void testSlowLogRecords() throws Exception {
+
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(15).build();
+
+ slowLogRecorder.clearSlowLogPayloads();
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+ int i = 0;
+
+ Connection connection = waitForSlowLogTableCreation();
+ // add 5 records initially
+ for (; i < 5; i++) {
+ RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ // add 2 more records
+ for (; i < 7; i++) {
+ RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ // add 3 more records
+ for (; i < 10; i++) {
+ RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ // add 4 more records
+ for (; i < 14; i++) {
+ RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY
+ .waitFor(3000, () -> slowLogRecorder.getSlowLogPayloads(request).size() == 14));
+
+ Assert.assertNotEquals(-1,
+ HBASE_TESTING_UTILITY.waitFor(3000, () -> getTableCount(connection) == 14));
+ }
+
+ private int getTableCount(Connection connection) {
+ try (Table table = connection.getTable(SlowLogTableAccessor.SLOW_LOG_TABLE_NAME)) {
+ ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
+ int count = 0;
+ for (Result result : resultScanner) {
+ ++count;
+ }
+ return count;
+ } catch (Exception e) {
+ return 0;
+ }
+ }
+
+ private Connection waitForSlowLogTableCreation() {
+ Connection connection =
+ HBASE_TESTING_UTILITY.getMiniHBaseCluster().getRegionServer(0).getConnection();
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(2000, () -> {
+ try {
+ return MetaTableAccessor.tableExists(connection, SlowLogTableAccessor.SLOW_LOG_TABLE_NAME);
+ } catch (IOException e) {
+ return false;
+ }
+ }));
+ return connection;
+ }
+
+ @Test
+ public void testHigherSlowLogs() throws Exception {
+ Connection connection = waitForSlowLogTableCreation();
+
+ slowLogRecorder.clearSlowLogPayloads();
+ AdminProtos.SlowLogResponseRequest request =
+ AdminProtos.SlowLogResponseRequest.newBuilder().setLimit(500000).build();
+ Assert.assertEquals(slowLogRecorder.getSlowLogPayloads(request).size(), 0);
+
+ for (int j = 0; j < 100; j++) {
+ CompletableFuture.runAsync(() -> {
+ for (int i = 0; i < 350; i++) {
+ if (i == 300) {
+ Uninterruptibles.sleepUninterruptibly(500, TimeUnit.MILLISECONDS);
+ }
+ RpcLogDetails rpcLogDetails = TestSlowLogRecorder
+ .getRpcLogDetails("userName_" + (i + 1), "client_" + (i + 1), "class_" + (i + 1));
+ slowLogRecorder.addSlowLogPayload(rpcLogDetails);
+ }
+ });
+ }
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
+ int count = slowLogRecorder.getSlowLogPayloads(request).size();
+ LOG.debug("RingBuffer records count: {}", count);
+ return count > 2000;
+ }));
+
+ Assert.assertNotEquals(-1, HBASE_TESTING_UTILITY.waitFor(7000, () -> {
+ int count = getTableCount(connection);
+ LOG.debug("SlowLog Table records count: {}", count);
+ return count > 2000;
+ }));
+ }
+
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
index bdd5c89..863e27b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/slowlog/TestSlowLogRecorder.java
@@ -486,18 +486,19 @@ public class TestSlowLogRecorder {
}
- private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
- String className) {
- return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, true, true);
+ static RpcLogDetails getRpcLogDetails(String userName, String clientAddress, String className) {
+ RpcCall rpcCall = getRpcCall(userName);
+ return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, true, true);
}
private RpcLogDetails getRpcLogDetails(String userName, String clientAddress,
String className, boolean isSlowLog, boolean isLargeLog) {
- return new RpcLogDetails(getRpcCall(userName), clientAddress, 0, className, isSlowLog,
+ RpcCall rpcCall = getRpcCall(userName);
+ return new RpcLogDetails(rpcCall, rpcCall.getParam(), clientAddress, 0, className, isSlowLog,
isLargeLog);
}
- private RpcCall getRpcCall(String userName) {
+ private static RpcCall getRpcCall(String userName) {
RpcCall rpcCall = new RpcCall() {
@Override
public BlockingService getService() {
@@ -646,7 +647,7 @@ public class TestSlowLogRecorder {
return rpcCall;
}
- private Message getMessage() {
+ private static Message getMessage() {
i = (i + 1) % 3;
@@ -693,7 +694,7 @@ public class TestSlowLogRecorder {
}
- private Optional<User> getUser(String userName) {
+ private static Optional<User> getUser(String userName) {
return Optional.of(new User() {