You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/04/26 21:47:10 UTC
[hbase] branch branch-2.5 updated: HBASE-26581 Add metrics for failed replication edits (#4347)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 3439c5bd901 HBASE-26581 Add metrics for failed replication edits (#4347)
3439c5bd901 is described below
commit 3439c5bd901c41ef916f708d71fae52652f5ac8a
Author: Bri Augenreich <bb...@vt.edu>
AuthorDate: Tue Apr 26 17:42:54 2022 -0400
HBASE-26581 Add metrics for failed replication edits (#4347)
Co-authored-by: Briana Augenreich <ba...@hubspot.com>
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
.../regionserver/MetricsReplicationSinkSource.java | 3 +
.../MetricsReplicationSourceSource.java | 2 +
.../MetricsReplicationGlobalSourceSourceImpl.java | 7 +++
.../MetricsReplicationSinkSourceImpl.java | 13 +++++
.../MetricsReplicationSourceSourceImpl.java | 10 ++++
.../replication/regionserver/MetricsSink.java | 15 +++++
.../replication/regionserver/MetricsSource.java | 9 +++
.../replication/regionserver/ReplicationSink.java | 3 +
.../regionserver/ReplicationSourceShipper.java | 1 +
.../regionserver/TestReplicationSink.java | 66 +++++++++++++++++++++-
10 files changed, 127 insertions(+), 2 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 2498e3426a5..fe11c1049ce 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+ public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
+ void incrFailedBatches();
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
+ long getFailedBatches();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index a6cf79b710f..d37dc133e2c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+ public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
@Deprecated
/** @deprecated Use SOURCE_SHIPPED_BYTES instead */
@@ -60,6 +61,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches);
+ void incrFailedBatches();
void incrOpsShipped(long ops);
void incrShippedBytes(long size);
void incrLogReadInBytes(long size);
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 5eb5deb03f6..cc97d7491a9 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
@@ -68,6 +69,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
+
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedKBsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_KBS, 0L);
@@ -127,6 +130,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter.incr(batches);
}
+ @Override public void incrFailedBatches() {
+ failedBatchesCounter.incr();
+ }
+
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index ce45af5ccec..86bc60577a6 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
}
@@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
opsCounter.incr(batchsize);
}
+ @Override
+ public void incrFailedBatches(){
+ failedBatchesCounter.incr();
+ }
+
+ @Override
+ public long getFailedBatches() {
+ return failedBatchesCounter.value();
+ }
+
@Override
public long getLastAppliedOpAge() {
return ageHist.getMax();
@@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
+
}
diff --git a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index 28499120119..bf1392ce9b7 100644
--- a/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop2-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private final String failedBatchesKey;
private String keyPrefix;
/**
@@ -53,6 +54,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedKBsCounter;
private final MutableFastCounter shippedBytesCounter;
@@ -91,6 +93,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
+ failedBatchesKey = this.keyPrefix + "failedBatches";
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
+
shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
@@ -167,6 +172,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesCounter.incr(batches);
}
+ @Override public void incrFailedBatches() {
+ failedBatchesCounter.incr();
+ }
+
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
@@ -187,6 +196,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(sizeOfLogQueueKey);
rms.removeMetric(shippedBatchesKey);
+ rms.removeMetric(failedBatchesKey);
rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedKBsKey);
rms.removeMetric(shippedBytesKey);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index ce785bbffd5..8f07c08c4eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -84,6 +84,21 @@ public class MetricsSink {
mss.incrAppliedHFiles(hfileSize);
}
+ /**
+ * Convenience method to update metrics when batch of operations has failed.
+ */
+ public void incrementFailedBatches(){
+ mss.incrFailedBatches();
+ }
+
+ /**
+ * Get the count of the failed bathes
+ * @return failedBatches
+ */
+ protected long getFailedBatches() {
+ return mss.getFailedBatches();
+ }
+
/**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 3b97e1ed3ab..3510254959b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrShippedBytes(sizeInBytes);
}
+ /**
+ * Convenience method to update metrics when batch of operations has failed.
+ */
+ public void incrementFailedBatches(){
+ singleSourceSource.incrFailedBatches();
+ globalSourceSource.incrFailedBatches();
+ }
+
+
/**
* Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index b947c806373..34fbb55b1b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -190,6 +190,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
+ this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
@@ -202,6 +203,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
+ this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
@@ -276,6 +278,7 @@ public class ReplicationSink {
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
+ this.metrics.incrementFailedBatches();
throw ex;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index c40cf7e00a4..95719273828 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -222,6 +222,7 @@ public class ReplicationSourceShipper extends Thread {
}
break;
} catch (Exception ex) {
+ source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepMultiplier)) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 02eb3d92d05..a7625e6029c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -69,15 +71,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
+
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {
@@ -427,6 +428,67 @@ public class TestReplicationSink {
// Clean up the created hfiles or it will mess up subsequent tests
}
+ /**
+ * Test failure metrics produced for failed replication edits
+ */
+ @Test
+ public void testFailedReplicationSinkMetrics() throws IOException {
+ long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
+ long errorCount = 0L;
+ List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
+ List<Cell> cells = new ArrayList<>();
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ cells.clear(); // cause IndexOutOfBoundsException
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ }
+
+ entries.clear();
+ cells.clear();
+ TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
+ }
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw TableNotFoundException.");
+ } catch (TableNotFoundException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ }
+
+ entries.clear();
+ cells.clear();
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ // cause IOException in batch()
+ try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
+ try (Admin admin = conn.getAdmin()) {
+ admin.disableTable(TABLE_NAME1);
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw IOException.");
+ } catch (IOException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ } finally {
+ admin.enableTable(TABLE_NAME1);
+ }
+ }
+ }
+ }
+
+
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);