You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bb...@apache.org on 2022/04/26 21:43:04 UTC

[hbase] branch master updated: HBASE-26581 Add metrics for failed replication edits (#4347)

This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new ee0c9212588 HBASE-26581 Add metrics for failed replication edits (#4347)
ee0c9212588 is described below

commit ee0c92125885d92c6538e0eae8018c9d9787dc10
Author: Bri Augenreich <bb...@vt.edu>
AuthorDate: Tue Apr 26 17:42:54 2022 -0400

    HBASE-26581 Add metrics for failed replication edits (#4347)
    
    Co-authored-by: Briana Augenreich <ba...@hubspot.com>
    Signed-off-by: Andrew Purtell <ap...@apache.org>
    Signed-off-by: Bryan Beaudreault <bb...@apache.org>
---
 .../MetricsReplicationGlobalSourceSourceImpl.java  |  7 +++
 .../regionserver/MetricsReplicationSinkSource.java |  3 +
 .../MetricsReplicationSinkSourceImpl.java          | 13 +++++
 .../MetricsReplicationSourceSource.java            |  2 +
 .../MetricsReplicationSourceSourceImpl.java        | 10 ++++
 .../replication/regionserver/MetricsSink.java      | 15 +++++
 .../replication/regionserver/MetricsSource.java    |  9 +++
 .../replication/regionserver/ReplicationSink.java  |  3 +
 .../regionserver/ReplicationSourceShipper.java     |  1 +
 .../regionserver/TestReplicationSink.java          | 66 +++++++++++++++++++++-
 10 files changed, 127 insertions(+), 2 deletions(-)

diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
index 547617a1669..a838195b445 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationGlobalSourceSourceImpl.java
@@ -36,6 +36,7 @@ public class MetricsReplicationGlobalSourceSourceImpl
   private final MutableFastCounter logReadInEditsCounter;
   private final MutableFastCounter walEditsFilteredCounter;
   private final MutableFastCounter shippedBatchesCounter;
+  private final MutableFastCounter failedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
   private final MutableFastCounter shippedBytesCounter;
   private final MutableFastCounter logReadInBytesCounter;
@@ -62,6 +63,8 @@ public class MetricsReplicationGlobalSourceSourceImpl
 
     shippedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BATCHES, 0L);
 
+    failedBatchesCounter = rms.getMetricsRegistry().getCounter(SOURCE_FAILED_BATCHES, 0L);
+
     shippedOpsCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_OPS, 0L);
 
     shippedBytesCounter = rms.getMetricsRegistry().getCounter(SOURCE_SHIPPED_BYTES, 0L);
@@ -119,6 +122,10 @@ public class MetricsReplicationGlobalSourceSourceImpl
     shippedBatchesCounter.incr(batches);
   }
 
+  @Override public void incrFailedBatches() {
+    failedBatchesCounter.incr();
+  }
+
   @Override public void incrOpsShipped(long ops) {
     shippedOpsCounter.incr(ops);
   }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
index 2498e3426a5..fe11c1049ce 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSource.java
@@ -24,13 +24,16 @@ import org.apache.yetus.audience.InterfaceAudience;
 public interface MetricsReplicationSinkSource {
   public static final String SINK_AGE_OF_LAST_APPLIED_OP = "sink.ageOfLastAppliedOp";
   public static final String SINK_APPLIED_BATCHES = "sink.appliedBatches";
+  public static final String SINK_FAILED_BATCHES = "sink.failedBatches";
   public static final String SINK_APPLIED_OPS = "sink.appliedOps";
   public static final String SINK_APPLIED_HFILES = "sink.appliedHFiles";
 
   void setLastAppliedOpAge(long age);
   void incrAppliedBatches(long batches);
   void incrAppliedOps(long batchsize);
+  void incrFailedBatches();
   long getLastAppliedOpAge();
   void incrAppliedHFiles(long hfileSize);
   long getSinkAppliedOps();
+  long getFailedBatches();
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
index ce45af5ccec..86bc60577a6 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSinkSourceImpl.java
@@ -27,12 +27,14 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
 
   private final MutableHistogram ageHist;
   private final MutableFastCounter batchesCounter;
+  private final MutableFastCounter failedBatchesCounter;
   private final MutableFastCounter opsCounter;
   private final MutableFastCounter hfilesCounter;
 
   public MetricsReplicationSinkSourceImpl(MetricsReplicationSourceImpl rms) {
     ageHist = rms.getMetricsRegistry().newTimeHistogram(SINK_AGE_OF_LAST_APPLIED_OP);
     batchesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_BATCHES, 0L);
+    failedBatchesCounter = rms.getMetricsRegistry().getCounter(SINK_FAILED_BATCHES, 0L);
     opsCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_OPS, 0L);
     hfilesCounter = rms.getMetricsRegistry().getCounter(SINK_APPLIED_HFILES, 0L);
   }
@@ -49,6 +51,16 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
     opsCounter.incr(batchsize);
   }
 
+  @Override
+  public void incrFailedBatches(){
+    failedBatchesCounter.incr();
+  }
+
+  @Override
+  public long getFailedBatches() {
+    return failedBatchesCounter.value();
+  }
+
   @Override
   public long getLastAppliedOpAge() {
     return ageHist.getMax();
@@ -62,4 +74,5 @@ public class MetricsReplicationSinkSourceImpl implements MetricsReplicationSinkS
   @Override public long getSinkAppliedOps() {
     return opsCounter.value();
   }
+
 }
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
index 42e28f5d0f3..e9ce8c66247 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSource.java
@@ -27,6 +27,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   public static final String SOURCE_SIZE_OF_LOG_QUEUE = "source.sizeOfLogQueue";
   public static final String SOURCE_AGE_OF_LAST_SHIPPED_OP = "source.ageOfLastShippedOp";
   public static final String SOURCE_SHIPPED_BATCHES = "source.shippedBatches";
+  public static final String SOURCE_FAILED_BATCHES = "source.failedBatches";
 
   public static final String SOURCE_SHIPPED_BYTES = "source.shippedBytes";
   public static final String SOURCE_SHIPPED_OPS = "source.shippedOps";
@@ -57,6 +58,7 @@ public interface MetricsReplicationSourceSource extends BaseSource {
   void decrSizeOfLogQueue(int size);
   void incrLogEditsFiltered(long size);
   void incrBatchesShipped(int batches);
+  void incrFailedBatches();
   void incrOpsShipped(long ops);
   void incrShippedBytes(long size);
   void incrLogReadInBytes(long size);
diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
index faf14f79cfb..049e849b52c 100644
--- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
+++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsReplicationSourceSourceImpl.java
@@ -34,6 +34,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final String logEditsFilteredKey;
   private final String shippedBatchesKey;
   private final String shippedOpsKey;
+  private final String failedBatchesKey;
   private String keyPrefix;
 
   private final String shippedBytesKey;
@@ -48,6 +49,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
   private final MutableFastCounter logReadInEditsCounter;
   private final MutableFastCounter walEditsFilteredCounter;
   private final MutableFastCounter shippedBatchesCounter;
+  private final MutableFastCounter failedBatchesCounter;
   private final MutableFastCounter shippedOpsCounter;
   private final MutableFastCounter shippedBytesCounter;
   private final MutableFastCounter logReadInBytesCounter;
@@ -85,6 +87,9 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     shippedBatchesKey = this.keyPrefix + "shippedBatches";
     shippedBatchesCounter = rms.getMetricsRegistry().getCounter(shippedBatchesKey, 0L);
 
+    failedBatchesKey = this.keyPrefix + "failedBatches";
+    failedBatchesCounter = rms.getMetricsRegistry().getCounter(failedBatchesKey, 0L);
+
     shippedOpsKey = this.keyPrefix + "shippedOps";
     shippedOpsCounter = rms.getMetricsRegistry().getCounter(shippedOpsKey, 0L);
 
@@ -158,6 +163,10 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     shippedBatchesCounter.incr(batches);
   }
 
+  @Override public void incrFailedBatches() {
+    failedBatchesCounter.incr();
+  }
+
   @Override public void incrOpsShipped(long ops) {
     shippedOpsCounter.incr(ops);
   }
@@ -176,6 +185,7 @@ public class MetricsReplicationSourceSourceImpl implements MetricsReplicationSou
     rms.removeMetric(sizeOfLogQueueKey);
 
     rms.removeMetric(shippedBatchesKey);
+    rms.removeMetric(failedBatchesKey);
     rms.removeMetric(shippedOpsKey);
     rms.removeMetric(shippedBytesKey);
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
index dede79d138c..18cb147af46 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSink.java
@@ -84,6 +84,21 @@ public class MetricsSink {
     mss.incrAppliedHFiles(hfileSize);
   }
 
+  /**
+   * Convenience method to update metrics when batch of operations has failed.
+   */
+  public void incrementFailedBatches(){
+    mss.incrFailedBatches();
+  }
+
+  /**
+   * Get the count of the failed bathes
+   * @return failedBatches
+   */
+  protected long getFailedBatches() {
+    return mss.getFailedBatches();
+  }
+
   /**
    * Get the Age of Last Applied Op
    * @return ageOfLastAppliedOp
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
index 3ab08065ca7..18f4354d3c5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/MetricsSource.java
@@ -230,6 +230,15 @@ public class MetricsSource implements BaseSource {
     globalSourceSource.incrShippedBytes(sizeInBytes);
   }
 
+  /**
+   * Convenience method to update metrics when batch of operations has failed.
+   */
+  public void incrementFailedBatches(){
+    singleSourceSource.incrFailedBatches();
+    globalSourceSource.incrFailedBatches();
+  }
+
+
   /**
    * Gets the number of edits not eligible for replication this source queue logs so far.
    * @return logEditsFiltered non-replicable edits filtered from this queue logs.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
index d1ee0220a9d..ffec5c1eebd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
@@ -193,6 +193,7 @@ public class ReplicationSink {
             for (int i = 0; i < count; i++) {
               // Throw index out of bounds if our cell count is off
               if (!cells.advance()) {
+                this.metrics.incrementFailedBatches();
                 throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
               }
             }
@@ -205,6 +206,7 @@ public class ReplicationSink {
         for (int i = 0; i < count; i++) {
           // Throw index out of bounds if our cell count is off
           if (!cells.advance()) {
+            this.metrics.incrementFailedBatches();
             throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
           }
           Cell cell = cells.current();
@@ -281,6 +283,7 @@ public class ReplicationSink {
       this.totalReplicatedEdits.addAndGet(totalReplicated);
     } catch (IOException ex) {
       LOG.error("Unable to accept edit because:", ex);
+      this.metrics.incrementFailedBatches();
       throw ex;
     }
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 33869dbf7c7..52032097b67 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -230,6 +230,7 @@ public class ReplicationSourceShipper extends Thread {
         }
         break;
       } catch (Exception ex) {
+        source.getSourceMetrics().incrementFailedBatches();
         LOG.warn("{} threw unknown exception:",
           source.getReplicationEndpoint().getClass().getName(), ex);
         if (sleepForRetries("ReplicationEndpoint threw exception", sleepForRetries, sleepMultiplier,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
index 10a5affcbce..26d33678979 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSink.java
@@ -19,6 +19,8 @@ package org.apache.hadoop.hbase.replication.regionserver;
 
 import static org.junit.Assert.assertEquals;
 
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -69,15 +71,14 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-
 import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.UUID;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey;
 
+
 @Category({ReplicationTests.class, LargeTests.class})
 public class TestReplicationSink {
 
@@ -427,6 +428,67 @@ public class TestReplicationSink {
     // Clean up the created hfiles or it will mess up subsequent tests
   }
 
+  /**
+   * Test failure metrics produced for failed replication edits
+   */
+  @Test
+  public void testFailedReplicationSinkMetrics() throws IOException {
+    long initialFailedBatches = SINK.getSinkMetrics().getFailedBatches();
+    long errorCount = 0L;
+    List<WALEntry> entries = new ArrayList<>(BATCH_SIZE);
+    List<Cell> cells = new ArrayList<>();
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+    }
+    cells.clear(); // cause IndexOutOfBoundsException
+    try {
+      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      Assert.fail("Should re-throw ArrayIndexOutOfBoundsException.");
+    } catch (ArrayIndexOutOfBoundsException e) {
+      errorCount++;
+      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+    }
+
+    entries.clear();
+    cells.clear();
+    TableName notExistTable = TableName.valueOf("notExistTable");  // cause TableNotFoundException
+    for (int i = 0; i < BATCH_SIZE; i++) {
+      entries.add(createEntry(notExistTable, i, KeyValue.Type.Put, cells));
+    }
+    try {
+      SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      Assert.fail("Should re-throw TableNotFoundException.");
+    } catch (TableNotFoundException e) {
+      errorCount++;
+      assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+    }
+
+    entries.clear();
+    cells.clear();
+    for(int i = 0; i < BATCH_SIZE; i++) {
+      entries.add(createEntry(TABLE_NAME1, i, KeyValue.Type.Put, cells));
+    }
+    // cause IOException in batch()
+    try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration())) {
+      try (Admin admin = conn.getAdmin()) {
+        admin.disableTable(TABLE_NAME1);
+        try {
+          SINK.replicateEntries(entries, CellUtil.createCellScanner(cells.iterator()),
+            replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+          Assert.fail("Should re-throw IOException.");
+        } catch (IOException e) {
+          errorCount++;
+          assertEquals(initialFailedBatches + errorCount, SINK.getSinkMetrics().getFailedBatches());
+        } finally {
+          admin.enableTable(TABLE_NAME1);
+        }
+      }
+    }
+  }
+
+
   private WALEntry createEntry(TableName table, int row,  KeyValue.Type type, List<Cell> cells) {
     byte[] fam = table.equals(TABLE_NAME1) ? FAM_NAME1 : FAM_NAME2;
     byte[] rowBytes = Bytes.toBytes(row);