You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2023/04/07 07:07:27 UTC
[hbase] branch branch-2.5 updated: HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 045aa9f7809 HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)
045aa9f7809 is described below
commit 045aa9f78093ab3b4336bdf6a981ec763b123fea
Author: chenglei <ch...@apache.org>
AuthorDate: Fri Apr 7 15:07:20 2023 +0800
HBASE-27778 Incorrect ReplicationSourceWALReader. totalBufferUsed may cause replication hang up (#5162)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../regionserver/ReplicationSourceWALReader.java | 47 +++++++++---
.../replication/regionserver/WALEntryBatch.java | 13 +++-
.../regionserver/TestBasicWALEntryStream.java | 83 ++++++++++++++++++++++
3 files changed, 131 insertions(+), 12 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index 28768679fb6..256f9a78624 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -143,16 +143,28 @@ class ReplicationSourceWALReader extends Thread {
entryStream.reset(); // reuse stream
continue;
}
- // if we have already switched a file, skip reading and put it directly to the ship queue
- if (!batch.isEndOfFile()) {
- readWALEntries(entryStream, batch);
- currentPosition = entryStream.getPosition();
+ boolean successAddToQueue = false;
+ try {
+ // if we have already switched a file, skip reading and put it directly to the ship
+ // queue
+ if (!batch.isEndOfFile()) {
+ readWALEntries(entryStream, batch);
+ currentPosition = entryStream.getPosition();
+ }
+ // need to propagate the batch even it has no entries since it may carry the last
+ // sequence id information for serial replication.
+ LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
+ entryBatchQueue.put(batch);
+ successAddToQueue = true;
+ sleepMultiplier = 1;
+ } finally {
+ if (!successAddToQueue) {
+ // batch is not put to ReplicationSourceWALReader#entryBatchQueue,so we should
+ // decrease ReplicationSourceWALReader.totalBufferUsed by the byte size which
+ // acquired in ReplicationSourceWALReader.acquireBufferQuota.
+ this.releaseBufferQuota(batch);
+ }
}
- // need to propagate the batch even it has no entries since it may carry the last
- // sequence id information for serial replication.
- LOG.debug("Read {} WAL entries eligible for replication", batch.getNbEntries());
- entryBatchQueue.put(batch);
- sleepMultiplier = 1;
}
} catch (WALEntryFilterRetryableException | IOException e) { // stream related
if (!handleEofException(e, batch)) {
@@ -182,7 +194,7 @@ class ReplicationSourceWALReader extends Thread {
long entrySizeExcludeBulkLoad = getEntrySizeExcludeBulkLoad(entry);
batch.addEntry(entry, entrySize);
updateBatchStats(batch, entry, entrySize);
- boolean totalBufferTooLarge = acquireBufferQuota(entrySizeExcludeBulkLoad);
+ boolean totalBufferTooLarge = acquireBufferQuota(batch, entrySizeExcludeBulkLoad);
// Stop if too many entries or too big
return totalBufferTooLarge || batch.getHeapSize() >= replicationBatchSizeCapacity
@@ -455,13 +467,26 @@ class ReplicationSourceWALReader extends Thread {
* @param size delta size for grown buffer
* @return true if we should clear buffer and push all
*/
- private boolean acquireBufferQuota(long size) {
+ private boolean acquireBufferQuota(WALEntryBatch walEntryBatch, long size) {
long newBufferUsed = totalBufferUsed.addAndGet(size);
// Record the new buffer usage
this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ walEntryBatch.incrementUsedBufferSize(size);
return newBufferUsed >= totalBufferQuota;
}
+ /**
+ * To release the buffer quota of {@link WALEntryBatch} which acquired by
+ * {@link ReplicationSourceWALReader#acquireBufferQuota}
+ */
+ private void releaseBufferQuota(WALEntryBatch walEntryBatch) {
+ long usedBufferSize = walEntryBatch.getUsedBufferSize();
+ if (usedBufferSize > 0) {
+ long newBufferUsed = totalBufferUsed.addAndGet(-usedBufferSize);
+ this.source.getSourceManager().getGlobalMetrics().setWALReaderEditsBufferBytes(newBufferUsed);
+ }
+ }
+
/** Returns whether the reader thread is running */
public boolean isReaderRunning() {
return isReaderRunning && !isInterrupted();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
index b5ef0f92bcc..06c97db785d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALEntryBatch.java
@@ -52,6 +52,9 @@ class WALEntryBatch {
private Map<String, Long> lastSeqIds = new HashMap<>();
// indicate that this is the end of the current file
private boolean endOfFile;
+ // indicate the buffer size used, which is added to
+ // ReplicationSourceWALReader.totalBufferUsed
+ private long usedBufferSize = 0;
/**
* @param lastWalPath Path of the WAL the last entry in this batch was read from
@@ -153,11 +156,19 @@ class WALEntryBatch {
lastSeqIds.put(region, sequenceId);
}
+ public void incrementUsedBufferSize(long increment) {
+ usedBufferSize += increment;
+ }
+
+ public long getUsedBufferSize() {
+ return this.usedBufferSize;
+ }
+
@Override
public String toString() {
return "WALEntryBatch [walEntries=" + walEntriesWithSize + ", lastWalPath=" + lastWalPath
+ ", lastWalPosition=" + lastWalPosition + ", nbRowKeys=" + nbRowKeys + ", nbHFiles="
+ nbHFiles + ", heapSize=" + heapSize + ", lastSeqIds=" + lastSeqIds + ", endOfFile="
- + endOfFile + "]";
+ + endOfFile + ",usedBufferSize=" + usedBufferSize + "]";
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
index eda89b232c3..a997cd0a9d5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestBasicWALEntryStream.java
@@ -280,6 +280,13 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
when(source.isRecovered()).thenReturn(recovered);
MetricsReplicationGlobalSourceSource globalMetrics =
Mockito.mock(MetricsReplicationGlobalSourceSource.class);
+ final AtomicLong bufferUsedCounter = new AtomicLong(0);
+ Mockito.doAnswer((invocationOnMock) -> {
+ bufferUsedCounter.set(invocationOnMock.getArgument(0, Long.class));
+ return null;
+ }).when(globalMetrics).setWALReaderEditsBufferBytes(Mockito.anyLong());
+ when(globalMetrics.getWALReaderEditsBufferBytes())
+ .then(invocationOnMock -> bufferUsedCounter.get());
when(mockSourceManager.getGlobalMetrics()).thenReturn(globalMetrics);
return source;
}
@@ -764,4 +771,80 @@ public abstract class TestBasicWALEntryStream extends WALEntryStreamTestBase {
Waiter.waitFor(localConf, 10000,
(Waiter.Predicate<Exception>) () -> logQueue.getQueueSize(fakeWalGroupId) == 1);
}
+
+ /**
+ * This test is for HBASE-27778, when {@link WALEntryFilter#filter} throws exception for some
+ * entries in {@link WALEntryBatch},{@link ReplicationSourceWALReader#totalBufferUsed} should be
+ * decreased because {@link WALEntryBatch} is not put to
+ * {@link ReplicationSourceWALReader#entryBatchQueue}.
+ */
+ @Test
+ public void testReplicationSourceWALReaderWithPartialWALEntryFailingFilter() throws Exception {
+ appendEntriesToLogAndSync(3);
+ // get ending position
+ long position;
+ try (WALEntryStream entryStream =
+ new WALEntryStream(logQueue, CONF, 0, log, null, new MetricsSource("1"), fakeWalGroupId)) {
+ entryStream.next();
+ entryStream.next();
+ entryStream.next();
+ position = entryStream.getPosition();
+ }
+
+ Path walPath = getQueue().peek();
+ int maxThrowExceptionCount = 3;
+
+ ReplicationSource source = mockReplicationSource(false, CONF);
+ when(source.isPeerEnabled()).thenReturn(true);
+ PartialWALEntryFailingWALEntryFilter walEntryFilter =
+ new PartialWALEntryFailingWALEntryFilter(maxThrowExceptionCount, 3);
+ ReplicationSourceWALReader reader =
+ new ReplicationSourceWALReader(fs, CONF, logQueue, 0, walEntryFilter, source, fakeWalGroupId);
+ reader.start();
+ WALEntryBatch entryBatch = reader.take();
+
+ assertNotNull(entryBatch);
+ assertEquals(3, entryBatch.getWalEntries().size());
+ long sum = entryBatch.getWalEntries().stream()
+ .mapToLong(ReplicationSourceWALReader::getEntrySizeExcludeBulkLoad).sum();
+ assertEquals(position, entryBatch.getLastWalPosition());
+ assertEquals(walPath, entryBatch.getLastWalPath());
+ assertEquals(3, entryBatch.getNbRowKeys());
+ assertEquals(sum, source.getSourceManager().getTotalBufferUsed().get());
+ assertEquals(sum, source.getSourceManager().getGlobalMetrics().getWALReaderEditsBufferBytes());
+ assertEquals(maxThrowExceptionCount, walEntryFilter.getThrowExceptionCount());
+ assertNull(reader.poll(10));
+ }
+
+ private static class PartialWALEntryFailingWALEntryFilter implements WALEntryFilter {
+ private int filteredWALEntryCount = -1;
+ private int walEntryCount = 0;
+ private int throwExceptionCount = -1;
+ private int maxThrowExceptionCount;
+
+ public PartialWALEntryFailingWALEntryFilter(int throwExceptionLimit, int walEntryCount) {
+ this.maxThrowExceptionCount = throwExceptionLimit;
+ this.walEntryCount = walEntryCount;
+ }
+
+ @Override
+ public Entry filter(Entry entry) {
+ filteredWALEntryCount++;
+ if (filteredWALEntryCount < walEntryCount - 1) {
+ return entry;
+ }
+
+ filteredWALEntryCount = -1;
+ throwExceptionCount++;
+ if (throwExceptionCount <= maxThrowExceptionCount - 1) {
+ throw new WALEntryFilterRetryableException("failing filter");
+ }
+ return entry;
+ }
+
+ public int getThrowExceptionCount() {
+ return throwExceptionCount;
+ }
+ }
+
}