You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2022/06/01 23:54:19 UTC
[hbase] branch HBASE-26913-replication-observability-framework updated: HBASE-26925 Create WAL event tracker table to track all the WAL events. (#4382)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch HBASE-26913-replication-observability-framework
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/HBASE-26913-replication-observability-framework by this push:
new c440b78eb57 HBASE-26925 Create WAL event tracker table to track all the WAL events. (#4382)
c440b78eb57 is described below
commit c440b78eb57988386b161d6cdca8cc598f2eaa14
Author: Rushabh Shah <sh...@gmail.com>
AuthorDate: Wed Jun 1 16:54:09 2022 -0700
HBASE-26925 Create WAL event tracker table to track all the WAL events. (#4382)
---
.../hadoop/hbase/slowlog/SlowLogTableAccessor.java | 23 +-
.../java/org/apache/hadoop/hbase/HConstants.java | 24 ++
hbase-common/src/main/resources/hbase-default.xml | 2 +-
.../namequeues/MetricsWALEventTrackerSource.java | 65 ++++++
.../MetricsWALEventTrackerSourceImpl.java | 59 +++++
...p.hbase.namequeues.MetricsWALEventTrackerSource | 18 ++
.../org/apache/hadoop/hbase/master/HMaster.java | 3 +
.../WALEventTrackerTableCreator.java | 75 +++++++
.../hadoop/hbase/namequeues/LogEventHandler.java | 8 +-
.../hadoop/hbase/namequeues/NamedQueuePayload.java | 6 +-
.../hbase/namequeues/NamedQueueRecorder.java | 7 +-
.../hadoop/hbase/namequeues/NamedQueueService.java | 4 +-
...leOpsChore.java => NamedQueueServiceChore.java} | 28 ++-
.../hbase/namequeues/SlowLogPersistentService.java | 7 +-
.../hbase/namequeues/WALEventTrackerPayload.java | 73 ++++++
.../namequeues/WALEventTrackerQueueService.java | 148 +++++++++++++
.../namequeues/WALEventTrackerTableAccessor.java | 142 ++++++++++++
.../impl/BalancerDecisionQueueService.java | 3 +-
.../impl/BalancerRejectionQueueService.java | 3 +-
.../hbase/namequeues/impl/SlowLogQueueService.java | 5 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 56 +++--
.../hbase/regionserver/wal/AbstractFSWAL.java | 2 +-
.../regionserver/wal/WALEventTrackerListener.java | 94 ++++++++
.../hbase/namequeues/TestWALEventTracker.java | 245 +++++++++++++++++++++
.../TestWALEventTrackerTableAccessor.java | 58 +++++
.../TestWalEventTrackerQueueService.java | 86 ++++++++
26 files changed, 1179 insertions(+), 65 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
index 5ea6144d037..e6db8f43017 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/slowlog/SlowLogTableAccessor.java
@@ -21,12 +21,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
@@ -48,8 +46,6 @@ public class SlowLogTableAccessor {
private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableAccessor.class);
- private static Connection connection;
-
/**
* hbase:slowlog table name - can be enabled with config -
* hbase.regionserver.slowlog.systable.enabled
@@ -66,10 +62,10 @@ public class SlowLogTableAccessor {
/**
* Add slow/large log records to hbase:slowlog table
* @param slowLogPayloads List of SlowLogPayload to process
- * @param configuration Configuration to use for connection
+ * @param connection connection
*/
public static void addSlowLogRecords(final List<TooSlowLog.SlowLogPayload> slowLogPayloads,
- final Configuration configuration) {
+ Connection connection) {
List<Put> puts = new ArrayList<>(slowLogPayloads.size());
for (TooSlowLog.SlowLogPayload slowLogPayload : slowLogPayloads) {
final byte[] rowKey = getRowKey(slowLogPayload);
@@ -102,26 +98,12 @@ public class SlowLogTableAccessor {
puts.add(put);
}
try {
- if (connection == null) {
- createConnection(configuration);
- }
doPut(connection, puts);
} catch (Exception e) {
LOG.warn("Failed to add slow/large log records to hbase:slowlog table.", e);
}
}
- private static synchronized void createConnection(Configuration configuration)
- throws IOException {
- Configuration conf = new Configuration(configuration);
- // rpc timeout: 20s
- conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 20000);
- // retry count: 5
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
- conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
- connection = ConnectionFactory.createConnection(conf);
- }
-
/**
* Create rowKey: currentTime APPEND slowLogPayload.hashcode Scan on slowlog table should keep
* records with sorted order of time, however records added at the very same time could be in
@@ -140,5 +122,4 @@ public class SlowLogTableAccessor {
final long rowKeyLong = Long.parseLong(timeAndHashcode);
return Bytes.toBytes(rowKeyLong);
}
-
}
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f4d43a2da29..0ab1bab31a1 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1552,6 +1552,14 @@ public final class HConstants {
"hbase.regionserver.slowlog.systable.enabled";
public static final boolean DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY = false;
+ @Deprecated
+ // since <need to know the version number> and will be removed in <version number>
+ // Instead use hbase.regionserver.named.queue.chore.duration config property
+ public static final String SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY =
+ "hbase.slowlog.systable.chore.duration";
+ // Default 10 mins.
+ public static final int DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION = 10 * 60 * 1000;
+
public static final String SHELL_TIMESTAMP_FORMAT_EPOCH_KEY =
"hbase.shell.timestamp.format.epoch";
@@ -1567,6 +1575,22 @@ public final class HConstants {
*/
public static final int BATCH_ROWS_THRESHOLD_DEFAULT = 5000;
+ public static final String WAL_EVENT_TRACKER_ENABLED_KEY =
+ "hbase.regionserver.wal.event.tracker.enabled";
+ public static final boolean WAL_EVENT_TRACKER_ENABLED_DEFAULT = false;
+
+ public static final String NAMED_QUEUE_CHORE_DURATION_KEY =
+ "hbase.regionserver.named.queue.chore.duration";
+ // 10 mins default.
+ public static final int NAMED_QUEUE_CHORE_DURATION_DEFAULT = 10 * 60 * 1000;
+
+ /** The walEventTracker info family as a string */
+ private static final String WAL_EVENT_TRACKER_INFO_FAMILY_STR = "info";
+
+ /** The walEventTracker info family in array of bytes */
+ public static final byte[] WAL_EVENT_TRACKER_INFO_FAMILY =
+ Bytes.toBytes(WAL_EVENT_TRACKER_INFO_FAMILY_STR);
+
private HConstants() {
// Can't be instantiated with this ctor.
}
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index dc94a6d3e68..ad9a820f83d 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -2022,7 +2022,7 @@ possible configurations would overwhelm and obscure the important.
</property>
<property>
<name>hbase.namedqueue.provider.classes</name>
- <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService</value>
+ <value>org.apache.hadoop.hbase.namequeues.impl.SlowLogQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerDecisionQueueService,org.apache.hadoop.hbase.namequeues.impl.BalancerRejectionQueueService,org.apache.hadoop.hbase.namequeues.WALEventTrackerQueueService</value>
<description>
Default values for NamedQueueService implementors. This comma separated full class names
represent all implementors of NamedQueueService that we would like to be invoked by
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java
new file mode 100644
index 00000000000..8bd95aefe8e
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSource.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.metrics.BaseSource;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public interface MetricsWALEventTrackerSource extends BaseSource {
+ /**
+ * The name of the metrics
+ */
+ String METRICS_NAME = "WALEventTracker";
+
+ /**
+ * The name of the metrics context that metrics will be under.
+ */
+ String METRICS_CONTEXT = "regionserver";
+
+ /**
+ * Description
+ */
+ String METRICS_DESCRIPTION = "Metrics about HBase RegionServer WALEventTracker";
+
+ /**
+ * The name of the metrics context that metrics will be under in jmx
+ */
+ String METRICS_JMX_CONTEXT = "RegionServer,sub=" + METRICS_NAME;
+
+ String NUM_FAILED_PUTS = "numFailedPuts";
+ String NUM_FAILED_PUTS_DESC = "Number of put requests that failed";
+
+ String NUM_RECORDS_FAILED_PUTS = "numRecordsFailedPuts";
+ String NUM_RECORDS_FAILED_PUTS_DESC = "number of records in failed puts";
+
+ /*
+ * Increment 2 counters, numFailedPuts and numRecordsFailedPuts
+ */
+ void incrFailedPuts(long numRecords);
+
+ /*
+ * Get the failed puts counter.
+ */
+ long getFailedPuts();
+
+ /*
+ * Get the number of records in failed puts.
+ */
+ long getNumRecordsFailedPuts();
+}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java
new file mode 100644
index 00000000000..0ae5b12c4d6
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/namequeues/MetricsWALEventTrackerSourceImpl.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.hadoop.hbase.metrics.BaseSourceImpl;
+import org.apache.hadoop.metrics2.lib.MutableFastCounter;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class MetricsWALEventTrackerSourceImpl extends BaseSourceImpl
+ implements MetricsWALEventTrackerSource {
+
+ private final MutableFastCounter numFailedPutsCount;
+ private final MutableFastCounter numRecordsFailedPutsCount;
+
+ public MetricsWALEventTrackerSourceImpl() {
+ this(METRICS_NAME, METRICS_DESCRIPTION, METRICS_CONTEXT, METRICS_JMX_CONTEXT);
+ }
+
+ public MetricsWALEventTrackerSourceImpl(String metricsName, String metricsDescription,
+ String metricsContext, String metricsJmxContext) {
+ super(metricsName, metricsDescription, metricsContext, metricsJmxContext);
+ numFailedPutsCount =
+ this.getMetricsRegistry().newCounter(NUM_FAILED_PUTS, NUM_FAILED_PUTS_DESC, 0L);
+ numRecordsFailedPutsCount = this.getMetricsRegistry().newCounter(NUM_RECORDS_FAILED_PUTS,
+ NUM_RECORDS_FAILED_PUTS_DESC, 0L);
+ }
+
+ @Override
+ public void incrFailedPuts(long numRecords) {
+ numFailedPutsCount.incr();
+ numRecordsFailedPutsCount.incr(numRecords);
+ }
+
+ @Override
+ public long getFailedPuts() {
+ return numFailedPutsCount.value();
+ }
+
+ @Override
+ public long getNumRecordsFailedPuts() {
+ return numRecordsFailedPutsCount.value();
+ }
+}
diff --git a/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource
new file mode 100644
index 00000000000..5870bf1a9cf
--- /dev/null
+++ b/hbase-hadoop-compat/src/main/resources/META-INF/services/org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSource
@@ -0,0 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.hadoop.hbase.namequeues.MetricsWALEventTrackerSourceImpl
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index e4f2391771a..df98f8f81e9 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -171,6 +171,7 @@ import org.apache.hadoop.hbase.master.replication.TransitPeerSyncReplicationStat
import org.apache.hadoop.hbase.master.replication.UpdatePeerConfigProcedure;
import org.apache.hadoop.hbase.master.slowlog.SlowLogMasterService;
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
+import org.apache.hadoop.hbase.master.waleventtracker.WALEventTrackerTableCreator;
import org.apache.hadoop.hbase.master.zksyncer.MasterAddressSyncer;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.mob.MobFileCleanerChore;
@@ -1228,6 +1229,8 @@ public class HMaster extends HBaseServerBase<MasterRpcServices> implements Maste
final SlowLogMasterService slowLogMasterService = new SlowLogMasterService(conf, this);
slowLogMasterService.init();
+ WALEventTrackerTableCreator.createIfNeededAndNotExists(conf, this);
+
// clear the dead servers with same host name and port of online server because we are not
// removing dead server with same hostname and port of rs which is trying to check in before
// master initialization. See HBASE-5916.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java
new file mode 100644
index 00000000000..a82e5866060
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/waleventtracker/WALEventTrackerTableCreator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.master.waleventtracker;
+
+import static org.apache.hadoop.hbase.HConstants.NO_NONCE;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME_STR;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.master.MasterServices;
+import org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * WALEventTracker Table creation to be used by HMaster
+ */
+@InterfaceAudience.Private
+public final class WALEventTrackerTableCreator {
+ private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableCreator.class);
+ private static final Long TTL = TimeUnit.DAYS.toSeconds(365); // 1 year in seconds
+
+ private static final TableDescriptorBuilder TABLE_DESCRIPTOR_BUILDER =
+ TableDescriptorBuilder.newBuilder(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
+ .setRegionReplication(1).setColumnFamily(
+ ColumnFamilyDescriptorBuilder.newBuilder(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY)
+ .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBlockCacheEnabled(false)
+ .setMaxVersions(1).setTimeToLive(TTL.intValue()).build());
+
+ /* Private default constructor */
+ private WALEventTrackerTableCreator() {
+ }
+
+ /*
+ * We will create this table only if hbase.regionserver.wal.event.tracker.enabled is enabled and
+ * table doesn't exists already.
+ */
+ public static void createIfNeededAndNotExists(Configuration conf, MasterServices masterServices)
+ throws IOException {
+ boolean walEventTrackerEnabled = conf.getBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY,
+ HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+ if (!walEventTrackerEnabled) {
+ LOG.info("wal event tracker requests logging to table " + WAL_EVENT_TRACKER_TABLE_NAME_STR
+ + " is disabled. Quitting.");
+ return;
+ }
+ if (
+ !masterServices.getTableDescriptors()
+ .exists(WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME)
+ ) {
+ LOG.info(WAL_EVENT_TRACKER_TABLE_NAME_STR + " table not found. Creating.");
+ masterServices.createTable(TABLE_DESCRIPTOR_BUILDER.build(), null, 0L, NO_NONCE);
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
index ed4b470d577..2d6f5bf5734 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/LogEventHandler.java
@@ -23,6 +23,7 @@ import java.lang.reflect.InvocationTargetException;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
@@ -70,7 +71,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
namedQueueServices.put(namedQueueService.getEvent(), namedQueueService);
} catch (InstantiationException | IllegalAccessException | NoSuchMethodException
| InvocationTargetException e) {
- LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz);
+ LOG.warn("Unable to instantiate/add NamedQueueService implementor {} to service map.", clz,
+ e);
}
}
}
@@ -105,8 +107,8 @@ class LogEventHandler implements EventHandler<RingBufferEnvelope> {
* Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system.
*/
- void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
- namedQueueServices.get(namedQueueEvent).persistAll();
+ void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
+ namedQueueServices.get(namedQueueEvent).persistAll(connection);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
index ba2eb3322d6..39cc093b2aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueuePayload.java
@@ -29,7 +29,8 @@ public class NamedQueuePayload {
public enum NamedQueueEvent {
SLOW_LOG(0),
BALANCE_DECISION(1),
- BALANCE_REJECTION(2);
+ BALANCE_REJECTION(2),
+ WAL_EVENT_TRACKER(3);
private final int value;
@@ -48,6 +49,9 @@ public class NamedQueuePayload {
case 2: {
return BALANCE_REJECTION;
}
+ case 3: {
+ return WAL_EVENT_TRACKER;
+ }
default: {
throw new IllegalArgumentException(
"NamedQueue event with ordinal " + value + " not defined");
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
index 38f63fd09be..6e88cf9cbc2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueRecorder.java
@@ -22,6 +22,7 @@ import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.dsl.ProducerType;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.hadoop.hbase.util.Threads;
@@ -60,7 +61,7 @@ public class NamedQueueRecorder {
// disruptor initialization with BlockingWaitStrategy
this.disruptor = new Disruptor<>(RingBufferEnvelope::new, getEventCount(eventCount),
- new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".slowlog.append-pool-%d")
+ new ThreadFactoryBuilder().setNameFormat(hostingThreadName + ".named-queue-events-pool-%d")
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build(),
ProducerType.MULTI, new BlockingWaitStrategy());
this.disruptor.setDefaultExceptionHandler(new DisruptorExceptionHandler());
@@ -137,9 +138,9 @@ public class NamedQueueRecorder {
* Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system.
*/
- public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent) {
+ public void persistAll(NamedQueuePayload.NamedQueueEvent namedQueueEvent, Connection connection) {
if (this.logEventHandler != null) {
- this.logEventHandler.persistAll(namedQueueEvent);
+ this.logEventHandler.persistAll(namedQueueEvent, connection);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
index 889323d9592..6154a7c2de3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueService.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.namequeues;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
import org.apache.yetus.audience.InterfaceAudience;
@@ -57,6 +58,7 @@ public interface NamedQueueService {
/**
* Add all in memory queue records to system table. The implementors can use system table or
* direct HDFS file or ZK as persistence system.
+ * @param connection connection
*/
- void persistAll();
+ void persistAll(Connection connection);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
similarity index 66%
rename from hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
index 0de6c876989..b42baa328e4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogTableOpsChore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/NamedQueueServiceChore.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.namequeues;
import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -27,11 +28,12 @@ import org.slf4j.LoggerFactory;
* Chore to insert multiple accumulated slow/large logs to hbase:slowlog system table
*/
@InterfaceAudience.Private
-public class SlowLogTableOpsChore extends ScheduledChore {
+public class NamedQueueServiceChore extends ScheduledChore {
- private static final Logger LOG = LoggerFactory.getLogger(SlowLogTableOpsChore.class);
+ private static final Logger LOG = LoggerFactory.getLogger(NamedQueueServiceChore.class);
private final NamedQueueRecorder namedQueueRecorder;
+ private final Connection connection;
/**
* Chore Constructor
@@ -41,21 +43,23 @@ public class SlowLogTableOpsChore extends ScheduledChore {
* scheduled
* @param namedQueueRecorder {@link NamedQueueRecorder} instance
*/
- public SlowLogTableOpsChore(final Stoppable stopper, final int period,
- final NamedQueueRecorder namedQueueRecorder) {
- super("SlowLogTableOpsChore", stopper, period);
+ public NamedQueueServiceChore(final Stoppable stopper, final int period,
+ final NamedQueueRecorder namedQueueRecorder, Connection connection) {
+ super("NamedQueueServiceChore", stopper, period);
this.namedQueueRecorder = namedQueueRecorder;
+ this.connection = connection;
}
@Override
protected void chore() {
- if (LOG.isTraceEnabled()) {
- LOG.trace("SlowLog Table Ops Chore is starting up.");
- }
- namedQueueRecorder.persistAll(NamedQueuePayload.NamedQueueEvent.SLOW_LOG);
- if (LOG.isTraceEnabled()) {
- LOG.trace("SlowLog Table Ops Chore is closing.");
+ for (NamedQueuePayload.NamedQueueEvent event : NamedQueuePayload.NamedQueueEvent.values()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Starting chore for event %s", event.name()));
+ }
+ namedQueueRecorder.persistAll(event, connection);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(String.format("Stopping chore for event %s", event.name()));
+ }
}
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
index 95c1ed53f52..b4104e6008f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/SlowLogPersistentService.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.Queue;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.slowlog.SlowLogTableAccessor;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
@@ -66,7 +67,7 @@ public class SlowLogPersistentService {
/**
* Poll from queueForSysTable and insert 100 records in hbase:slowlog table in single batch
*/
- public void addAllLogsToSysTable() {
+ public void addAllLogsToSysTable(Connection connection) {
if (queueForSysTable == null) {
LOG.trace("hbase.regionserver.slowlog.systable.enabled is turned off. Exiting.");
return;
@@ -82,13 +83,13 @@ public class SlowLogPersistentService {
slowLogPayloads.add(queueForSysTable.poll());
i++;
if (i == SYSTABLE_PUT_BATCH_SIZE) {
- SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
slowLogPayloads.clear();
i = 0;
}
}
if (slowLogPayloads.size() > 0) {
- SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, this.configuration);
+ SlowLogTableAccessor.addSlowLogRecords(slowLogPayloads, connection);
}
} finally {
LOCK.unlock();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java
new file mode 100644
index 00000000000..9f549a72e51
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerPayload.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class WALEventTrackerPayload extends NamedQueuePayload {
+
+ private final String rsName;
+ private final String walName;
+ private final long timeStamp;
+ private final String state;
+ private final long walLength;
+
+ public WALEventTrackerPayload(String rsName, String walName, long timeStamp, String state,
+ long walLength) {
+ super(NamedQueueEvent.WAL_EVENT_TRACKER.getValue());
+ this.rsName = rsName;
+ this.walName = walName;
+ this.timeStamp = timeStamp;
+ this.state = state;
+ this.walLength = walLength;
+ }
+
+ public String getRsName() {
+ return rsName;
+ }
+
+ public String getWalName() {
+ return walName;
+ }
+
+ public long getTimeStamp() {
+ return timeStamp;
+ }
+
+ public String getState() {
+ return state;
+ }
+
+ public long getWalLength() {
+ return walLength;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder(this.getClass().getSimpleName());
+ sb.append("[");
+ sb.append("rsName=").append(rsName);
+ sb.append(", walName=").append(walName);
+ sb.append(", timeStamp=").append(timeStamp);
+ sb.append(", walState=").append(state);
+ sb.append(", walLength=").append(walLength);
+ sb.append("]");
+ return sb.toString();
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java
new file mode 100644
index 00000000000..40fb6033cc3
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerQueueService.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY;
+
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest;
+import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.EvictingQueue;
+
+/*
+ This class provides the queue to save Wal events from backing RingBuffer.
+ */
+@InterfaceAudience.Private
+public class WALEventTrackerQueueService implements NamedQueueService {
+
+ private EvictingQueue<WALEventTrackerPayload> queue;
+ private static final String WAL_EVENT_TRACKER_RING_BUFFER_SIZE =
+ "hbase.regionserver.wal.event.tracker.ringbuffer.size";
+ private final boolean walEventTrackerEnabled;
+ private int queueSize;
+ private MetricsWALEventTrackerSource source = null;
+
+ private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerQueueService.class);
+
+ public WALEventTrackerQueueService(Configuration conf) {
+ this(conf, null);
+ }
+
+ public WALEventTrackerQueueService(Configuration conf, MetricsWALEventTrackerSource source) {
+ this.walEventTrackerEnabled =
+ conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+ if (!walEventTrackerEnabled) {
+ return;
+ }
+
+ this.queueSize = conf.getInt(WAL_EVENT_TRACKER_RING_BUFFER_SIZE, 256);
+ queue = EvictingQueue.create(queueSize);
+ if (source == null) {
+ this.source = CompatibilitySingletonFactory.getInstance(MetricsWALEventTrackerSource.class);
+ } else {
+ this.source = source;
+ }
+ }
+
+ @Override
+ public NamedQueuePayload.NamedQueueEvent getEvent() {
+ return NamedQueuePayload.NamedQueueEvent.WAL_EVENT_TRACKER;
+ }
+
+ @Override
+ public void consumeEventFromDisruptor(NamedQueuePayload namedQueuePayload) {
+ if (!walEventTrackerEnabled) {
+ return;
+ }
+ if (!(namedQueuePayload instanceof WALEventTrackerPayload)) {
+ LOG.warn("WALEventTrackerQueueService: NamedQueuePayload is not of type"
+ + " WALEventTrackerPayload.");
+ return;
+ }
+
+ WALEventTrackerPayload payload = (WALEventTrackerPayload) namedQueuePayload;
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Adding wal event tracker payload " + payload);
+ }
+ addToQueue(payload);
+ }
+
+ /*
+ * Made it default to use it in testing.
+ */
+ synchronized void addToQueue(WALEventTrackerPayload payload) {
+ queue.add(payload);
+ }
+
+ @Override
+ public boolean clearNamedQueue() {
+ if (!walEventTrackerEnabled) {
+ return false;
+ }
+ LOG.debug("Clearing wal event tracker queue");
+ queue.clear();
+ return true;
+ }
+
+ @Override
+ public NamedQueueGetResponse getNamedQueueRecords(NamedQueueGetRequest request) {
+ return null;
+ }
+
+ @Override
+ public void persistAll(Connection connection) {
+ if (!walEventTrackerEnabled) {
+ return;
+ }
+ if (queue.isEmpty()) {
+ LOG.debug("Wal Event tracker queue is empty.");
+ return;
+ }
+
+ Queue<WALEventTrackerPayload> queue = getWALEventTrackerList();
+ try {
+ WALEventTrackerTableAccessor.addWalEventTrackerRows(queue, connection);
+ } catch (Exception ioe) {
+ // If we fail to persist the records with retries then just forget about them.
+ // This is a best effort service.
+ LOG.error("Failed while persisting wal tracker records", ioe);
+ // Increment metrics for failed puts
+ source.incrFailedPuts(queue.size());
+ }
+ }
+
+ private synchronized Queue<WALEventTrackerPayload> getWALEventTrackerList() {
+ Queue<WALEventTrackerPayload> retQueue = new ArrayDeque<>();
+ Iterator<WALEventTrackerPayload> iterator = queue.iterator();
+ while (iterator.hasNext()) {
+ retQueue.add(iterator.next());
+ }
+ queue.clear();
+ return retQueue;
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java
new file mode 100644
index 00000000000..51dc064a620
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/WALEventTrackerTableAccessor.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@InterfaceAudience.Private
+public final class WALEventTrackerTableAccessor {
+ private static final Logger LOG = LoggerFactory.getLogger(WALEventTrackerTableAccessor.class);
+
+ public static final String RS_COLUMN = "region_server_name";
+ public static final String WAL_NAME_COLUMN = "wal_name";
+ public static final String TIMESTAMP_COLUMN = "timestamp";
+ public static final String WAL_STATE_COLUMN = "wal_state";
+ public static final String WAL_LENGTH_COLUMN = "wal_length";
+ public static final String MAX_ATTEMPTS_KEY = "wal.event.tracker.max.attempts";
+ public static final String SLEEP_INTERVAL_KEY = "wal.event.tracker.sleep.interval.msec";
+ public static final String MAX_SLEEP_TIME_KEY = "wal.event.tracker.max.sleep.time.msec";
+ public static final int DEFAULT_MAX_ATTEMPTS = 3;
+ public static final long DEFAULT_SLEEP_INTERVAL = 1000L; // 1 second
+ public static final long DEFAULT_MAX_SLEEP_TIME = 60000L; // 60 seconds
+ public static final String WAL_EVENT_TRACKER_TABLE_NAME_STR = "REPLICATION.WALEVENTTRACKER";
+ public static final String DELIMITER = "_";
+
+ private WALEventTrackerTableAccessor() {
+ }
+
+ /**
+ * {@link #WAL_EVENT_TRACKER_TABLE_NAME_STR} table name - can be enabled with config -
+ * hbase.regionserver.wal.event.tracker.enabled
+ */
+ public static final TableName WAL_EVENT_TRACKER_TABLE_NAME =
+ TableName.valueOf(WAL_EVENT_TRACKER_TABLE_NAME_STR);
+
+ private static void doPut(final Connection connection, final List<Put> puts) throws Exception {
+ RetryCounter retryCounter = getRetryFactory(connection.getConfiguration()).create();
+ while (true) {
+ try (Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME)) {
+ table.put(puts);
+ return;
+ } catch (IOException ioe) {
+ retryOrThrow(retryCounter, ioe);
+ }
+ retryCounter.sleepUntilNextRetry();
+ }
+ }
+
+ private static RetryCounterFactory getRetryFactory(Configuration conf) {
+ int maxAttempts = conf.getInt(MAX_ATTEMPTS_KEY, DEFAULT_MAX_ATTEMPTS);
+ long sleepIntervalMs = conf.getLong(SLEEP_INTERVAL_KEY, DEFAULT_SLEEP_INTERVAL);
+ long maxSleepTimeMs = conf.getLong(MAX_SLEEP_TIME_KEY, DEFAULT_MAX_SLEEP_TIME);
+ RetryCounter.RetryConfig retryConfig =
+ new RetryCounter.RetryConfig(maxAttempts, sleepIntervalMs, maxSleepTimeMs,
+ TimeUnit.MILLISECONDS, new RetryCounter.ExponentialBackoffPolicyWithLimit());
+ return new RetryCounterFactory(retryConfig);
+ }
+
+ private static void retryOrThrow(RetryCounter retryCounter, IOException ioe) throws IOException {
+ if (retryCounter.shouldRetry()) {
+ return;
+ }
+ throw ioe;
+ }
+
+ /**
+ * Add wal event tracker rows to hbase:waleventtracker table
+ * @param walEventPayloads List of walevents to process
+ * @param connection Connection to use.
+ */
+ public static void addWalEventTrackerRows(Queue<WALEventTrackerPayload> walEventPayloads,
+ final Connection connection) throws Exception {
+ List<Put> puts = new ArrayList<>(walEventPayloads.size());
+ for (WALEventTrackerPayload payload : walEventPayloads) {
+ final byte[] rowKey = getRowKey(payload);
+ final Put put = new Put(rowKey);
+ // TODO Do we need to SKIP_WAL ?
+ put.setPriority(HConstants.NORMAL_QOS);
+ put
+ .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(RS_COLUMN),
+ Bytes.toBytes(payload.getRsName()))
+ .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_NAME_COLUMN),
+ Bytes.toBytes(payload.getWalName()))
+ .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(TIMESTAMP_COLUMN),
+ Bytes.toBytes(payload.getTimeStamp()))
+ .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_STATE_COLUMN),
+ Bytes.toBytes(payload.getState()))
+ .addColumn(HConstants.WAL_EVENT_TRACKER_INFO_FAMILY, Bytes.toBytes(WAL_LENGTH_COLUMN),
+ Bytes.toBytes(payload.getWalLength()));
+ puts.add(put);
+ }
+ doPut(connection, puts);
+ }
+
+ /**
+ * Create rowKey: 1. We want RS name to be the leading part of rowkey so that we can query by RS
+ * name filter. WAL name contains rs name as a leading part. 2. Timestamp when the event was
+ * generated. 3. Add state of the wal. Combination of 1 + 2 + 3 is definitely going to create a
+ * unique rowkey.
+ * @param payload payload to process
+ * @return rowKey byte[]
+ */
+ public static byte[] getRowKey(final WALEventTrackerPayload payload) {
+ String walName = payload.getWalName();
+ // converting to string since this will help seeing the timestamp in string format using
+ // hbase shell commands.
+ String timestampStr = String.valueOf(payload.getTimeStamp());
+ String walState = payload.getState();
+ final String rowKeyStr = walName + DELIMITER + timestampStr + DELIMITER + walState;
+ return Bytes.toBytes(rowKeyStr);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
index 45bfca11270..885e2d44279 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerDecisionQueueService.java
@@ -24,6 +24,7 @@ import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerDecision;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerDecisionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@@ -141,7 +142,7 @@ public class BalancerDecisionQueueService implements NamedQueueService {
}
@Override
- public void persistAll() {
+ public void persistAll(Connection connection) {
// no-op for now
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
index 79b7325b305..fb94db2b917 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/BalancerRejectionQueueService.java
@@ -24,6 +24,7 @@ import java.util.Queue;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.BalancerRejection;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer;
import org.apache.hadoop.hbase.namequeues.BalancerRejectionDetails;
import org.apache.hadoop.hbase.namequeues.NamedQueuePayload;
@@ -127,7 +128,7 @@ public class BalancerRejectionQueueService implements NamedQueueService {
}
@Override
- public void persistAll() {
+ public void persistAll(Connection connection) {
// no-op for now
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
index 03b6aa719ea..86b24e9d975 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/namequeues/impl/SlowLogQueueService.java
@@ -25,6 +25,7 @@ import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.SlowLogParams;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.namequeues.LogHandlerUtils;
@@ -223,12 +224,12 @@ public class SlowLogQueueService implements NamedQueueService {
* table.
*/
@Override
- public void persistAll() {
+ public void persistAll(Connection connection) {
if (!isOnlineLogProviderEnabled) {
return;
}
if (slowLogPersistentService != null) {
- slowLogPersistentService.addAllLogsToSysTable();
+ slowLogPersistentService.addAllLogsToSysTable(connection);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7dc474dcdb2..7e87cde1eff 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK;
import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_MAX_SPLITTER;
+import static org.apache.hadoop.hbase.HConstants.NAMED_QUEUE_CHORE_DURATION_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_DEFAULT;
+import static org.apache.hadoop.hbase.HConstants.WAL_EVENT_TRACKER_ENABLED_KEY;
import static org.apache.hadoop.hbase.util.DNS.UNSAFE_RS_HOSTNAME_KEY;
import java.io.IOException;
@@ -107,7 +111,7 @@ import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
-import org.apache.hadoop.hbase.namequeues.SlowLogTableOpsChore;
+import org.apache.hadoop.hbase.namequeues.NamedQueueServiceChore;
import org.apache.hadoop.hbase.net.Address;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManagerHost;
import org.apache.hadoop.hbase.procedure2.RSProcedureCallable;
@@ -130,6 +134,8 @@ import org.apache.hadoop.hbase.regionserver.http.RSStatusServlet;
import org.apache.hadoop.hbase.regionserver.regionreplication.RegionReplicationBufferManager;
import org.apache.hadoop.hbase.regionserver.throttle.FlushThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus;
@@ -362,7 +368,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
private final RegionServerAccounting regionServerAccounting;
- private SlowLogTableOpsChore slowLogTableOpsChore = null;
+ private NamedQueueServiceChore namedQueueServiceChore = null;
// Block cache
private BlockCache blockCache;
@@ -1688,9 +1694,23 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
}
// Instantiate replication if replication enabled. Pass it the log directories.
createNewReplicationInstance(conf, this, this.walFs, logDir, oldLogDir, factory);
+
+ WALActionsListener walEventListener = getWALEventTrackerListener(conf);
+ if (walEventListener != null && factory.getWALProvider() != null) {
+ factory.getWALProvider().addWALActionsListener(walEventListener);
+ }
this.walFactory = factory;
}
+ private WALActionsListener getWALEventTrackerListener(Configuration conf) {
+ if (conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT)) {
+ WALEventTrackerListener listener =
+ new WALEventTrackerListener(conf, getNamedQueueRecorder(), getServerName());
+ return listener;
+ }
+ return null;
+ }
+
/**
* Start up replication source and sink handlers.
*/
@@ -1860,8 +1880,8 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
if (this.fsUtilizationChore != null) {
choreService.scheduleChore(fsUtilizationChore);
}
- if (this.slowLogTableOpsChore != null) {
- choreService.scheduleChore(slowLogTableOpsChore);
+ if (this.namedQueueServiceChore != null) {
+ choreService.scheduleChore(namedQueueServiceChore);
}
if (this.brokenStoreFileCleaner != null) {
choreService.scheduleChore(brokenStoreFileCleaner);
@@ -1913,10 +1933,22 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
final boolean isSlowLogTableEnabled = conf.getBoolean(HConstants.SLOW_LOG_SYS_TABLE_ENABLED_KEY,
HConstants.DEFAULT_SLOW_LOG_SYS_TABLE_ENABLED_KEY);
- if (isSlowLogTableEnabled) {
+ final boolean walEventTrackerEnabled =
+ conf.getBoolean(WAL_EVENT_TRACKER_ENABLED_KEY, WAL_EVENT_TRACKER_ENABLED_DEFAULT);
+
+ if (isSlowLogTableEnabled || walEventTrackerEnabled) {
// default chore duration: 10 min
- final int duration = conf.getInt("hbase.slowlog.systable.chore.duration", 10 * 60 * 1000);
- slowLogTableOpsChore = new SlowLogTableOpsChore(this, duration, this.namedQueueRecorder);
+ // After <version number>, we will remove hbase.slowlog.systable.chore.duration conf property
+ final int slowLogChoreDuration = conf.getInt(HConstants.SLOW_LOG_SYS_TABLE_CHORE_DURATION_KEY,
+ DEFAULT_SLOW_LOG_SYS_TABLE_CHORE_DURATION);
+
+ final int namedQueueChoreDuration =
+ conf.getInt(HConstants.NAMED_QUEUE_CHORE_DURATION_KEY, NAMED_QUEUE_CHORE_DURATION_DEFAULT);
+ // Considering min of slowLogChoreDuration and namedQueueChoreDuration
+ int choreDuration = Math.min(slowLogChoreDuration, namedQueueChoreDuration);
+
+ namedQueueServiceChore = new NamedQueueServiceChore(this, choreDuration,
+ this.namedQueueRecorder, this.getConnection());
}
if (this.nonceManager != null) {
@@ -3498,13 +3530,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
@Override
protected NamedQueueRecorder createNamedQueueRecord() {
- final boolean isOnlineLogProviderEnabled = conf.getBoolean(
- HConstants.SLOW_LOG_BUFFER_ENABLED_KEY, HConstants.DEFAULT_ONLINE_LOG_PROVIDER_ENABLED);
- if (isOnlineLogProviderEnabled) {
- return NamedQueueRecorder.getInstance(conf);
- } else {
- return null;
- }
+ return NamedQueueRecorder.getInstance(conf);
}
@Override
@@ -3533,7 +3559,7 @@ public class HRegionServer extends HBaseServerBase<RSRpcServices>
shutdownChore(executorStatusChore);
shutdownChore(storefileRefresher);
shutdownChore(fsUtilizationChore);
- shutdownChore(slowLogTableOpsChore);
+ shutdownChore(namedQueueServiceChore);
shutdownChore(brokenStoreFileCleaner);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 2c0a656049c..49fdb9748da 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -639,7 +639,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
return newPath;
}
- Path getOldPath() {
+ public Path getOldPath() {
long currentFilenum = this.filenum.get();
Path oldPath = null;
if (currentFilenum > 0) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java
new file mode 100644
index 00000000000..487c7de4170
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEventTrackerListener.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.wal;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder;
+import org.apache.hadoop.hbase.namequeues.WALEventTrackerPayload;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+
+@InterfaceAudience.Private
+public class WALEventTrackerListener implements WALActionsListener {
+ private final Configuration conf;
+ private final NamedQueueRecorder namedQueueRecorder;
+ private final String serverName;
+
+ public enum WalState {
+ ROLLING,
+ ROLLED,
+ ACTIVE
+ }
+
+ public WALEventTrackerListener(Configuration conf, NamedQueueRecorder namedQueueRecorder,
+ ServerName serverName) {
+ this.conf = conf;
+ this.namedQueueRecorder = namedQueueRecorder;
+ this.serverName = serverName.getHostname();
+ }
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) {
+ if (oldPath != null) {
+ // oldPath can be null for first wal
+ // Just persist the last component of path not the whole walName which includes filesystem
+ // scheme, walDir.
+ WALEventTrackerPayload payloadForOldPath =
+ getPayload(oldPath.getName(), WalState.ROLLING.name(), 0L);
+ this.namedQueueRecorder.addRecord(payloadForOldPath);
+ }
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) {
+ // Create 2 entries entry in RingBuffer.
+ // 1. Change state to Rolled for oldPath
+ // 2. Change state to Active for newPath.
+ if (oldPath != null) {
+ // oldPath can be null for first wal
+ // Just persist the last component of path not the whole walName which includes filesystem
+ // scheme, walDir.
+
+ long fileLength = 0L;
+ try {
+ FileSystem fs = oldPath.getFileSystem(this.conf);
+ fileLength = fs.getFileStatus(oldPath).getLen();
+ } catch (IOException ioe) {
+ // Saving wal length is best effort. In case of any exception just ignore.
+ }
+ WALEventTrackerPayload payloadForOldPath =
+ getPayload(oldPath.getName(), WalState.ROLLED.name(), fileLength);
+ this.namedQueueRecorder.addRecord(payloadForOldPath);
+ }
+
+ WALEventTrackerPayload payloadForNewPath =
+ getPayload(newPath.getName(), WalState.ACTIVE.name(), 0L);
+ this.namedQueueRecorder.addRecord(payloadForNewPath);
+ }
+
+ private WALEventTrackerPayload getPayload(String path, String state, long walLength) {
+ long timestamp = EnvironmentEdgeManager.currentTime();
+ WALEventTrackerPayload payload =
+ new WALEventTrackerPayload(serverName, path, timestamp, state, walLength);
+ return payload;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java
new file mode 100644
index 00000000000..1a87effa8d5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTracker.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.RS_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.TIMESTAMP_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_LENGTH_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_NAME_COLUMN;
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_STATE_COLUMN;
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestWALEventTracker {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALEventTracker.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestNamedQueueRecorder.class);
+ private static HBaseTestingUtil TEST_UTIL;
+ public static Configuration CONF;
+
+ @BeforeClass
+ public static void setup() throws Exception {
+ CONF = HBaseConfiguration.create();
+ CONF.setBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY, true);
+ // Set the chore for less than a second.
+ CONF.setInt(HConstants.NAMED_QUEUE_CHORE_DURATION_KEY, 900);
+ CONF.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
+ TEST_UTIL = new HBaseTestingUtil(CONF);
+ TEST_UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void teardown() throws Exception {
+ LOG.info("Calling teardown");
+ TEST_UTIL.shutdownMiniHBaseCluster();
+ }
+
+ @Before
+ public void waitForWalEventTrackerTableCreation() {
+ Waiter.waitFor(CONF, 10000,
+ (Waiter.Predicate) () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
+ }
+
+ @Test
+ public void testWALRolling() throws Exception {
+ Connection connection = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getConnection();
+ waitForWALEventTrackerTable(connection);
+ List<WAL> wals = TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getWALs();
+ assertEquals(1, wals.size());
+ AbstractFSWAL wal = (AbstractFSWAL) wals.get(0);
+ Path wal1Path = wal.getOldPath();
+ wal.rollWriter(true);
+
+ FileSystem fs = TEST_UTIL.getTestFileSystem();
+ long wal1Length = fs.getFileStatus(wal1Path).getLen();
+ Path wal2Path = wal.getOldPath();
+ String hostName =
+ TEST_UTIL.getMiniHBaseCluster().getRegionServer(0).getServerName().getHostname();
+
+ TEST_UTIL.waitFor(5000, () -> getTableCount(connection) >= 3);
+ List<WALEventTrackerPayload> walEventsList = getRows(hostName, connection);
+
+ // There should be atleast 2 events for wal1Name, with ROLLING and ROLLED state. Most of the
+ // time we will lose ACTIVE event for the first wal creates since hmaster will take some time
+ // to create hbase:waleventtracker table and by that time RS will already create the first wal
+ // and will try to persist it.
+ compareEvents(hostName, wal1Path.getName(), walEventsList,
+ new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ROLLING.name(),
+ WALEventTrackerListener.WalState.ROLLED.name())),
+ false);
+
+ // There should be only 1 event for wal2Name which is current wal, with ACTIVE state
+ compareEvents(hostName, wal2Path.getName(), walEventsList,
+ new ArrayList<>(Arrays.asList(WALEventTrackerListener.WalState.ACTIVE.name())), true);
+
+ // Check that event with wal1Path and state ROLLED has the wal length set.
+ checkWALRolledEventHasSize(walEventsList, wal1Path.getName(), wal1Length);
+ }
+
+ private void checkWALRolledEventHasSize(List<WALEventTrackerPayload> walEvents, String walName,
+ long actualSize) {
+ List<WALEventTrackerPayload> eventsFilteredByNameState = new ArrayList<>();
+ // Filter the list by walName and wal state.
+ for (WALEventTrackerPayload event : walEvents) {
+ if (
+ walName.equals(event.getWalName())
+ && WALEventTrackerListener.WalState.ROLLED.name().equals(event.getState())
+ ) {
+ eventsFilteredByNameState.add(event);
+ }
+ }
+
+ assertEquals(1, eventsFilteredByNameState.size());
+ // We are not comparing the size of the WAL in the tracker table with actual size.
+ // For AsyncWAL implementation, since the WAL file is closed in an async fashion, the WAL length
+ // will always be incorrect.
+ // For FSHLog implementation, we close the WAL in an executor thread. So there will always be
+ // a difference of trailer size bytes.
+ // assertEquals(actualSize, eventsFilteredByNameState.get(0).getWalLength());
+ }
+
+ /**
+ * Compare the events from @{@link WALEventTrackerTableAccessor#WAL_EVENT_TRACKER_TABLE_NAME}
+ * @param hostName hostname
+ * @param walName walname
+ * @param walEvents event from table
+ * @param expectedStates expected states for the hostname and wal name
+ * @param strict whether to check strictly or not. Sometimes we lose the ACTIVE state
+ * event for the first wal since it takes some time for hmaster to create
+ * the table and by that time RS already creates the first WAL and will try
+ * to persist ACTIVE event to waleventtracker table.
+ */
+ private void compareEvents(String hostName, String walName,
+ List<WALEventTrackerPayload> walEvents, List<String> expectedStates, boolean strict) {
+ List<WALEventTrackerPayload> eventsFilteredByWalName = new ArrayList<>();
+
+ // Assert that all the events have the same host name i.e they came from the same RS.
+ for (WALEventTrackerPayload event : walEvents) {
+ assertEquals(hostName, event.getRsName());
+ }
+
+ // Filter the list by walName.
+ for (WALEventTrackerPayload event : walEvents) {
+ if (walName.equals(event.getWalName())) {
+ eventsFilteredByWalName.add(event);
+ }
+ }
+
+ // Assert that the list of events after filtering by walName should be same as expected states.
+ if (strict) {
+ assertEquals(expectedStates.size(), eventsFilteredByWalName.size());
+ }
+
+ for (WALEventTrackerPayload event : eventsFilteredByWalName) {
+ expectedStates.remove(event.getState());
+ }
+ assertEquals(0, expectedStates.size());
+ }
+
+ private void waitForWALEventTrackerTable(Connection connection) throws IOException {
+ TEST_UTIL.waitFor(5000, () -> TEST_UTIL.getAdmin().tableExists(WAL_EVENT_TRACKER_TABLE_NAME));
+ }
+
+ private List<WALEventTrackerPayload> getRows(String rowKeyPrefix, Connection connection)
+ throws IOException {
+ List<WALEventTrackerPayload> list = new ArrayList<>();
+ Scan scan = new Scan();
+ scan.withStartRow(Bytes.toBytes(rowKeyPrefix));
+ Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+ ResultScanner scanner = table.getScanner(scan);
+
+ Result r;
+ while ((r = scanner.next()) != null) {
+ List<Cell> cells = r.listCells();
+ list.add(getPayload(cells));
+ }
+ return list;
+ }
+
+ private WALEventTrackerPayload getPayload(List<Cell> cells) {
+ String rsName = null, walName = null, walState = null;
+ long timestamp = 0L, walLength = 0L;
+ for (Cell cell : cells) {
+ byte[] qualifier = CellUtil.cloneQualifier(cell);
+ byte[] value = CellUtil.cloneValue(cell);
+ String qualifierStr = Bytes.toString(qualifier);
+
+ if (RS_COLUMN.equals(qualifierStr)) {
+ rsName = Bytes.toString(value);
+ } else if (WAL_NAME_COLUMN.equals(qualifierStr)) {
+ walName = Bytes.toString(value);
+ } else if (WAL_STATE_COLUMN.equals(qualifierStr)) {
+ walState = Bytes.toString(value);
+ } else if (TIMESTAMP_COLUMN.equals(qualifierStr)) {
+ timestamp = Bytes.toLong(value);
+ } else if (WAL_LENGTH_COLUMN.equals(qualifierStr)) {
+ walLength = Bytes.toLong(value);
+ }
+ }
+ return new WALEventTrackerPayload(rsName, walName, timestamp, walState, walLength);
+ }
+
+ private int getTableCount(Connection connection) throws Exception {
+ Table table = connection.getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+ ResultScanner resultScanner = table.getScanner(new Scan().setReadType(Scan.ReadType.STREAM));
+ int count = 0;
+ while (resultScanner.next() != null) {
+ count++;
+ }
+ LOG.info("Table count: " + count);
+ return count;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java
new file mode 100644
index 00000000000..397cda5a9e3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWALEventTrackerTableAccessor.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestWALEventTrackerTableAccessor {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWALEventTrackerTableAccessor.class);
+
+ /*
+ * Tests that rowkey is getting constructed correctly.
+ */
+ @Test
+ public void testRowKey() {
+ String rsName = "test-region-server";
+ String walName = "test-wal-0";
+ long timeStamp = EnvironmentEdgeManager.currentTime();
+ String walState = WALEventTrackerListener.WalState.ACTIVE.name();
+ long walLength = 100L;
+ WALEventTrackerPayload payload =
+ new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
+ byte[] rowKeyBytes = WALEventTrackerTableAccessor.getRowKey(payload);
+
+ String rowKeyBytesStr = Bytes.toString(rowKeyBytes);
+ String[] fields = rowKeyBytesStr.split(WALEventTrackerTableAccessor.DELIMITER);
+ // This is the format of rowkey: walName_timestamp_walState;
+ assertEquals(walName, fields[0]);
+ assertEquals(timeStamp, Long.valueOf(fields[1]).longValue());
+ assertEquals(walState, fields[2]);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java
new file mode 100644
index 00000000000..55cb0145de7
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/namequeues/TestWalEventTrackerQueueService.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.namequeues;
+
+import static org.apache.hadoop.hbase.namequeues.WALEventTrackerTableAccessor.WAL_EVENT_TRACKER_TABLE_NAME;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.regionserver.wal.WALEventTrackerListener;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(SmallTests.class)
+public class TestWalEventTrackerQueueService {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestWalEventTrackerQueueService.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ /*
+ * Test whether wal event tracker metrics are being incremented.
+ */
+ @Test
+ public void testMetrics() throws Exception {
+ String rsName = "test-region-server";
+ String walName = "test-wal-0";
+ long timeStamp = EnvironmentEdgeManager.currentTime();
+ String walState = WALEventTrackerListener.WalState.ACTIVE.name();
+ long walLength = 100L;
+ WALEventTrackerPayload payload =
+ new WALEventTrackerPayload(rsName, walName, timeStamp, walState, walLength);
+ Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(HConstants.WAL_EVENT_TRACKER_ENABLED_KEY, true);
+ conf.setLong(WALEventTrackerTableAccessor.SLEEP_INTERVAL_KEY, 100);
+ MetricsWALEventTrackerSourceImpl source = new MetricsWALEventTrackerSourceImpl(
+ name.getMethodName(), name.getMethodName(), name.getMethodName(), name.getMethodName());
+ WALEventTrackerQueueService service = new WALEventTrackerQueueService(conf, source);
+ service.addToQueue(payload);
+ Connection mockConnection = mock(Connection.class);
+ doReturn(conf).when(mockConnection).getConfiguration();
+ // Always throw IOException whenever mock connection is being used.
+ doThrow(new IOException()).when(mockConnection).getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+ assertEquals(0L, source.getFailedPuts());
+ assertEquals(0L, source.getNumRecordsFailedPuts());
+ // Persist all the events.
+ service.persistAll(mockConnection);
+ assertEquals(1L, source.getFailedPuts());
+ assertEquals(1L, source.getNumRecordsFailedPuts());
+ // Verify that we tried MAX_RETRY_ATTEMPTS retry attempts to persist.
+ verify(mockConnection, times(1 + WALEventTrackerTableAccessor.DEFAULT_MAX_ATTEMPTS))
+ .getTable(WAL_EVENT_TRACKER_TABLE_NAME);
+ }
+}