You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/02/25 17:26:44 UTC
[bookkeeper] branch master updated: ISSUE #2898: DistributedLogManager can skip over a segment on read.
This is an automated email from the ASF dual-hosted git repository.
ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git
The following commit(s) were added to refs/heads/master by this push:
new 407b09e ISSUE #2898: DistributedLogManager can skip over a segment on read.
407b09e is described below
commit 407b09e3cf06254038f00c7616ee06631359cbcc
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Feb 25 09:26:37 2022 -0800
ISSUE #2898: DistributedLogManager can skip over a segment on read.
Descriptions of the changes in this PR:
### Motivation
DLM test suite was flaky.
Repro/troubleshooting shows that DLM can skip over a data segment on read.
### Changes
Test + fix (don't move segment if it moved already).
Master Issue: #2898
Reviewers: Enrico Olivelli <eo...@gmail.com>, Nicolò Boschi <bo...@gmail.com>
This closes #3064 from dlg99/fix/dlm-issue2898, closes #2898
---
.../distributedlog/ReadAheadEntryReader.java | 10 +++--
.../TestBKDistributedLogManager.java | 52 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 3 deletions(-)
diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 915d81a..0269370 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -608,7 +608,7 @@ class ReadAheadEntryReader implements
if (cause instanceof EndOfLogSegmentException) {
// we reach end of the log segment
- moveToNextLogSegment();
+ moveToNextLogSegment(currentSegmentReader);
return;
}
if (cause instanceof IOException) {
@@ -906,11 +906,15 @@ class ReadAheadEntryReader implements
return true;
}
- void moveToNextLogSegment() {
+ void moveToNextLogSegment(final SegmentReader prevSegmentReader) {
orderedSubmit(new CloseableRunnable() {
@Override
public void safeRun() {
- unsafeMoveToNextLogSegment();
+ // Do not move forward if previous enqueued runnable
+ // already moved the segment forward.
+ if (prevSegmentReader == currentSegmentReader) {
+ unsafeMoveToNextLogSegment();
+ }
}
});
}
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index 548c4ca..43d1d84 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -105,6 +105,56 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
setupCluster(numBookies);
}
+ /**
+ * Test that DLM can reliably read data.
+ * Write multiple segments, read back multiple times.
+ * Make sure all entries from all segments are read back.
+ */
+ @Test(timeout = 120000)
+ public void testReadMultipleSegments() throws Exception {
+ String name = "distrlog-testReadMultipleSegments";
+
+ final int numSegments = 10;
+ final int numEntriesPerSegment = 2;
+ final int numReadIterations = 10;
+
+ BKDistributedLogManager dlm = createNewDLM(conf, name);
+ long txid = 1;
+ for (long i = 0; i < numSegments; i++) {
+ BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
+ for (long j = 1; j <= numEntriesPerSegment; j++) {
+ LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
+ out.write(op);
+ }
+ out.closeAndComplete();
+ out = dlm.startLogSegmentNonPartitioned();
+ out.closeAndComplete();
+ }
+ assertEquals(txid - 1, dlm.getLastTxId());
+ dlm.close();
+
+ for (int runId = 0; runId < numReadIterations; runId++) {
+ dlm = createNewDLM(conf, name);
+ assertEquals(txid - 1, dlm.getLastTxId());
+
+ long numLogRecs = 0;
+ LogReader reader = dlm.getInputStream(1);
+ LogRecord record = reader.readNext(false);
+ while (null != record) {
+ numLogRecs++;
+ DLMTestUtil.verifyLogRecord(record);
+ assertEquals("(skipped txs in the middle) Failed at iteration " + runId,
+ numLogRecs, record.getTransactionId());
+ record = reader.readNext(false);
+ }
+ reader.close();
+ dlm.close();
+
+ assertEquals("(missed txs at the end) Failed at iteration " + runId,
+ txid - 1, numLogRecs);
+ }
+ }
+
private void testNonPartitionedWritesInternal(String name, DistributedLogConfiguration conf) throws Exception {
BKDistributedLogManager dlm = createNewDLM(conf, name);
@@ -305,6 +355,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter());
Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
Assert.assertEquals(1L, writer1.getLastTxId());
+ // some flaky ZK errors, Issue 3063
+ Thread.sleep(1000);
AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter());
Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
Assert.assertEquals(2L, writer2.getLastTxId());