You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/04/26 21:43:04 UTC
[hbase] branch master updated: HBASE-26581 Add metrics for failed replication edits (#4347)
This is an automated email from the ASF dual-hosted git repository.
bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new ee0c9212588 HBASE-26581 Add metrics for failed replication edits (#4347)
ee0c9212588 is described below
commit ee0c92125885d92c6538e0eae8018c9d9787dc10
Author: Bri Augenreich <bb...@vt.edu>
AuthorDate: Tue Apr 26 17:42:54 2022 -0400
HBASE-26581 Add metrics for failed replication edits (#4347)
Co-authored-by: Briana Augenreich <ba...@hubspot.com>
Signed-off-by: Andrew Purtell <ap...@apache.org>
Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
.../MetricsReplicationGlobalSourceSourceImpl.java | 7 +++
.../regionserver/MetricsReplicationSinkSource.java | 3 +
.../MetricsReplicationSinkSourceImpl.java | 13 +++++
.../MetricsReplicationSourceSource.java | 2 +
.../MetricsReplicationSourceSourceImpl.java | 10 ++++
.../replication/regionserver/MetricsSink.java | 15 +++++
.../replication/regionserver/MetricsSource.java | 9 +++
.../replication/regionserver/ReplicationSink.java | 3 +
.../regionserver/ReplicationSourceShipper.java | 1 +
.../regionserver/TestReplicationSink.java | 66 +++++++++++++++++++++-
10 files changed, 127 insertions(+), 2 deletions(-)
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 547617a1669..a838195b445 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter;
@@ -62,6 +63,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
+
shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
@@ -119,6 +122,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
shippedBatchesCounter.incr(batches);
}
+ @Override public void incrFailedBatches() {
+ failedBatchesCounter.incr();
+ }
+
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 2498e3426a5..fe11c1049ce 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
public interface MetricsReplicationSinkSource {
public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+ public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
public static final String SINK_APPLIED_OPS = "sink.appliedOps";
public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
void setLastAppliedOpAge(long age);
void incrAppliedBatches(long batches);
void incrAppliedOps(long batchsize);
+ void incrFailedBatches();
long getLastAppliedOpAge();
void incrAppliedHFiles(long hfileSize);
long getSinkAppliedOps();
+ long getFailedBatches();
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index ce45af5ccec..86bc60577a6 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
private final MutableHistogram ageHist;
private final MutableFastCounter batchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter opsCounter;
private final MutableFastCounter hfilesCounter;
public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
}
@@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
opsCounter.incr(batchsize);
}
+ @Override
+ public void incrFailedBatches(){
+ failedBatchesCounter.incr();
+ }
+
+ @Override
+ public long getFailedBatches() {
+ return failedBatchesCounter.value();
+ }
+
@Override
public long getLastAppliedOpAge() {
return ageHist.getMax();
@@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
@Override public long getSinkAppliedOps() {
return opsCounter.value();
}
+
}
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 42e28f5d0f3..e9ce8c66247 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+ public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
@@ -57,6 +58,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
void decrSizeOfLogQueue(int size);
void incrLogEditsFiltered(long size);
void incrBatchesShipped(int batches);
+ void incrFailedBatches();
void incrOpsShipped(long ops);
void incrShippedBytes(long size);
void incrLogReadInBytes(long size);
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index faf14f79cfb..049e849b52c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final String logEditsFilteredKey;
private final String shippedBatchesKey;
private final String shippedOpsKey;
+ private final String failedBatchesKey;
private String keyPrefix;
private final String shippedBytesKey;
@@ -48,6 +49,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
private final MutableFastCounter logReadInEditsCounter;
private final MutableFastCounter walEditsFilteredCounter;
private final MutableFastCounter shippedBatchesCounter;
+ private final MutableFastCounter failedBatchesCounter;
private final MutableFastCounter shippedOpsCounter;
private final MutableFastCounter shippedBytesCounter;
private final MutableFastCounter logReadInBytesCounter;
@@ -85,6 +87,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesKey = this.keyPrefix + "shippedBatches";
shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
+ failedBatchesKey = this.keyPrefix + "failedBatches";
+ failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
+
shippedOpsKey = this.keyPrefix + "shippedOps";
shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
@@ -158,6 +163,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
shippedBatchesCounter.incr(batches);
}
+ @Override public void incrFailedBatches() {
+ failedBatchesCounter.incr();
+ }
+
@Override public void incrOpsShipped(long ops) {
shippedOpsCounter.incr(ops);
}
@@ -176,6 +185,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
rms.removeMetric(sizeOfLogQueueKey);
rms.removeMetric(shippedBatchesKey);
+ rms.removeMetric(failedBatchesKey);
rms.removeMetric(shippedOpsKey);
rms.removeMetric(shippedBytesKey);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index dede79d138c..18cb147af46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -84,6 +84,21 @@ public class MetricsSink {
mss.incrAppliedHFiles(hfileSize);
}
+ /**
+ * Convenience method to update metrics when batch of operations has failed.
+ */
+ public void incrementFailedBatches(){
+ mss.incrFailedBatches();
+ }
+
+ /**
+ * Get the count of the failed bathes
+ * @return failedBatches
+ */
+ protected long getFailedBatches() {
+ return mss.getFailedBatches();
+ }
+
/**
* Get the Age of Last Applied Op
* @return ageOfLastAppliedOp
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 3ab08065ca7..18f4354d3c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
globalSourceSource.incrShippedBytes(sizeInBytes);
}
+ /**
+ * Convenience method to update metrics when batch of operations has failed.
+ */
+ public void incrementFailedBatches(){
+ singleSourceSource.incrFailedBatches();
+ globalSourceSource.incrFailedBatches();
+ }
+
+
/**
* Gets the number of edits not eligible for replication this source queue logs so far.
* @return logEditsFiltered non-replicable edits filtered from this queue logs.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index d1ee0220a9d..ffec5c1eebd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -193,6 +193,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
+ this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
}
@@ -205,6 +206,7 @@ public class ReplicationSink {
for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off
if (!cells.advance()) {
+ this.metrics.incrementFailedBatches();
throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
}
Cell cell = cells.current();
@@ -281,6 +283,7 @@ public class ReplicationSink {
this.totalReplicatedEdits.addAndGet(totalReplicated);
} catch (IOException ex) {
LOG.error("Unable to accept edit because:", ex);
+ this.metrics.incrementFailedBatches();
throw ex;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 33869dbf7c7..52032097b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -230,6 +230,7 @@ public class ReplicationSourceShipper extends Thread {
}
break;
} catch (Exception ex) {
+ source.getSourceMetrics().incrementFailedBatches();
LOG.warn("{} threw unknown exception:",
source.getReplicationEndpoint().getClass().getName(), ex);
if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 10a5affcbce..26d33678979 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -69,15 +71,14 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
+
@Category({ReplicationTests.class, LargeTests.class})
public class TestReplicationSink {
@@ -427,6 +428,67 @@ public class TestReplicationSink {
// Clean up the created hfiles or it will mess up subsequent tests
}
+ /**
+ * Test failure metrics produced for failed replication edits
+ */
+ @Test
+ public void testFailedReplicationSinkMetrics() throws IOException {
+ long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
+ long errorCount = 0L;
+ List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
+ List<Cell> cells = new ArrayList<>();
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ cells.clear(); // cause IndexOutOfBoundsException
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
+ } catch (ArrayIndexOutOfBoundsException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ }
+
+ entries.clear();
+ cells.clear();
+ TableName notExistTable = TableName.valueOf("notExistTable"); // cause TableNotFoundException
+ for (int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
+ }
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw TableNotFoundException.");
+ } catch (TableNotFoundException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ }
+
+ entries.clear();
+ cells.clear();
+ for(int i = 0; i < BATCH_SIZE; i++) {
+ entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+ }
+ // cause IOException in batch()
+ try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
+ try (Admin admin = conn.getAdmin()) {
+ admin.disableTable(TABLE_NAME1);
+ try {
+ SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ Assert.fail("Should re-throw IOException.");
+ } catch (IOException e) {
+ errorCount++;
+ assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+ } finally {
+ admin.enableTable(TABLE_NAME1);
+ }
+ }
+ }
+ }
+
+
private WALEntry createEntry(TableName table, int row, KeyValue.Type type, List<Cell> cells) {
byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
byte[] rowBytes = Bytes.toBytes(row);