You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2023/04/07 07:07:27 UTC

[hbase] branch branch-2.5 updated: HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.5 by this push:
     new 045aa9f7809 HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)
045aa9f7809 is described below

commit 045aa9f78093ab3b4336bdf6a981ec763b123fea
Author: chenglei <ch...@apache.org>
AuthorDate: Fri Apr 7 15:07:20 2023 +0800

    HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../regionserver/ReplicationSourceWALReader.java   | 47 +++++++++---
 .../replication/regionserver/WALEntryBatch.java    | 13 +++-
 .../regionserver/TestBasicWALEntryStream.java      | 83 ++++++++++++++++++++++
 3 files changed, 131 insertions(+), 12 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 28768679fb6..256f9a78624 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -143,16 +143,28 @@ class ReplicationSourceWALReader extends Thread {
             entryStream.reset(); // reuse stream
             continue;
           }
-          // if we have already switched a file, skip reading and put it directly to the ship queue
-          if (!batch.isEndOfFile()) {
-            readWALEntries(entryStream, batch);
-            currentPosition = entryStream.getPosition();
+          boolean successAddToQueue = false;
+          try {
+            // if we have already switched a file, skip reading and put it directly to the ship
+            // queue
+            if (!batch.isEndOfFile()) {
+              readWALEntries(entryStream, batch);
+              currentPosition = entryStream.getPosition();
+            }
+            // need to propagate the batch even it has no entries since it may carry the last
+            // sequence id information for serial replication.
+            LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+            entryBatchQueue.put(batch);
+            successAddToQueue = true;
+            sleepMultiplier = 1;
+          } finally {
+            if (!successAddToQueue) {
+              // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
+              // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
+              // acquired in ReplicationSourceWALReader.acquireBufferQuota.
+              this.releaseBufferQuota(batch);
+            }
           }
-          // need to propagate the batch even it has no entries since it may carry the last
-          // sequence id information for serial replication.
-          LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
-          entryBatchQueue.put(batch);
-          sleepMultiplier = 1;
         }
       } catch (WALEntryFilterRetryableException | IOException e) { // stream related
         if (!handleEofException(e, batch)) {
@@ -182,7 +194,7 @@ class ReplicationSourceWALReader extends Thread {
     long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
     batch.addEntry(entry, entrySize);
     updateBatchStats(batch, entry, entrySize);
-    boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+    boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);
 
     // Stop if too many entries or too big
     return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
@@ -455,13 +467,26 @@ class ReplicationSourceWALReader extends Thread {
    * @param size delta size for grown buffer
    * @return true if we should clear buffer and push all
    */
-  private boolean acquireBufferQuota(long size) {
+  private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
     long newBufferUsed = totalBufferUsed.addAndGet(size);
     // Record the new buffer usage
     this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    walEntryBatch.incrementUsedBufferSize(size);
     return newBufferUsed >= totalBufferQuota;
   }
 
+  /**
+   * To release the buffer quota of {@link WALEntryBatch} which acquired by
+   * {@link ReplicationSourceWALReader#acquireBufferQuota}
+   */
+  private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
+    long usedBufferSize = walEntryBatch.getUsedBufferSize();
+    if (usedBufferSize > 0) {
+      long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
+      this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+    }
+  }
+
   /** Returns whether the reader thread is running */
   public boolean isReaderRunning() {
     return isReaderRunning && !isInterrupted();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index b5ef0f92bcc..06c97db785d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -52,6 +52,9 @@ class WALEntryBatch {
   private Map<String, Long> lastSeqIds = new HashMap<>();
   // indicate that this is the end of the current file
   private boolean endOfFile;
+  // indicate the buffer size used, which is added to
+  // ReplicationSourceWALReader.totalBufferUsed
+  private long usedBufferSize = 0;
 
   /**
    * @param lastWalPath Path of the WAL the last entry in this batch was read from
@@ -153,11 +156,19 @@ class WALEntryBatch {
     lastSeqIds.put(region, sequenceId);
   }
 
+  public void incrementUsedBufferSize(long increment) {
+    usedBufferSize += increment;
+  }
+
+  public long getUsedBufferSize() {
+    return this.usedBufferSize;
+  }
+
   @Override
   public String toString() {
     return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
       + ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
       + nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
-      + endOfFile + "]";
+      + endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index eda89b232c3..a997cd0a9d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -280,6 +280,13 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     when(source.isRecovered()).thenReturn(recovered);
     MetricsReplicationGlobalSourceSource globalMetrics =
       Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+    final AtomicLong bufferUsedCounter = new AtomicLong(0);
+    Mockito.doAnswer((invocationOnMock) -> {
+      bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
+      return null;
+    }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
+    when(globalMetrics.getWALReaderEditsBufferBytes())
+      .then(invocationOnMock -> bufferUsedCounter.get());
     when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
     return source;
   }
@@ -764,4 +771,80 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
     Waiter.waitFor(localConf, 10000,
       (Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
   }
+
+  /**
+   * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
+   * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
+   * decreased because {@link WALEntryBatch} is not put to
+   * {@link ReplicationSourceWALReader#entryBatchQueue}.
+   */
+  @Test
+  public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
+    appendEntriesToLogAndSync(3);
+    // get ending position
+    long position;
+    try (WALEntryStream entryStream =
+      new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+      entryStream.next();
+      entryStream.next();
+      entryStream.next();
+      position = entryStream.getPosition();
+    }
+
+    Path walPath = getQueue().peek();
+    int maxThrowExceptionCount = 3;
+
+    ReplicationSource source = mockReplicationSource(false, CONF);
+    when(source.isPeerEnabled()).thenReturn(true);
+    PartialWALEntryFailingWALEntryFilter walEntryFilter =
+      new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
+    ReplicationSourceWALReader reader =
+      new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
+    reader.start();
+    WALEntryBatch entryBatch = reader.take();
+
+    assertNotNull(entryBatch);
+    assertEquals(3, entryBatch.getWalEntries().size());
+    long sum = entryBatch.getWalEntries().stream()
+      .mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
+    assertEquals(position, entryBatch.getLastWalPosition());
+    assertEquals(walPath, entryBatch.getLastWalPath());
+    assertEquals(3, entryBatch.getNbRowKeys());
+    assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
+    assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
+    assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
+    assertNull(reader.poll(10));
+  }
+
+  private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
+    private int filteredWALEntryCount = -1;
+    private int walEntryCount = 0;
+    private int throwExceptionCount = -1;
+    private int maxThrowExceptionCount;
+
+    public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
+      this.maxThrowExceptionCount = throwExceptionLimit;
+      this.walEntryCount = walEntryCount;
+    }
+
+    @Override
+    public Entry filter(Entry entry) {
+      filteredWALEntryCount++;
+      if (filteredWALEntryCount < walEntryCount - 1) {
+        return entry;
+      }
+
+      filteredWALEntryCount = -1;
+      throwExceptionCount++;
+      if (throwExceptionCount <= maxThrowExceptionCount - 1) {
+        throw new WALEntryFilterRetryableException("failing filter");
+      }
+      return entry;
+    }
+
+    public int getThrowExceptionCount() {
+      return throwExceptionCount;
+    }
+  }
+
 }