You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by ay...@apache.org on 2022/02/25 17:26:44 UTC

[bookkeeper] branch master updated: ISSUE #2898: DistributedLogManager can skip over a segment on read.

This is an automated email from the ASF dual-hosted git repository.

ayegorov pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bookkeeper.git


The following commit(s) were added to refs/heads/master by this push:
     new 407b09e  ISSUE #2898: DistributedLogManager can skip over a segment on read.
407b09e is described below

commit 407b09e3cf06254038f00c7616ee06631359cbcc
Author: Andrey Yegorov <86...@users.noreply.github.com>
AuthorDate: Fri Feb 25 09:26:37 2022 -0800

    ISSUE #2898: DistributedLogManager can skip over a segment on read.
    
    Descriptions of the changes in this PR:
    ### Motivation
    
    DLM test suite was flaky.
    Repro/troubleshooting shows that DLM can skip over a data segment on read.
    
    ### Changes
    
    Test + fix (don't move segment if it moved already).
    
    Master Issue: #2898
    
    
    
    Reviewers: Enrico Olivelli <eo...@gmail.com>, Nicolò Boschi <bo...@gmail.com>
    
    This closes #3064 from dlg99/fix/dlm-issue2898, closes #2898
---
 .../distributedlog/ReadAheadEntryReader.java       | 10 +++--
 .../TestBKDistributedLogManager.java               | 52 ++++++++++++++++++++++
 2 files changed, 59 insertions(+), 3 deletions(-)

diff --git a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
index 915d81a..0269370 100644
--- a/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
+++ b/stream/distributedlog/core/src/main/java/org/apache/distributedlog/ReadAheadEntryReader.java
@@ -608,7 +608,7 @@ class ReadAheadEntryReader implements
 
         if (cause instanceof EndOfLogSegmentException) {
             // we reach end of the log segment
-            moveToNextLogSegment();
+            moveToNextLogSegment(currentSegmentReader);
             return;
         }
         if (cause instanceof IOException) {
@@ -906,11 +906,15 @@ class ReadAheadEntryReader implements
         return true;
     }
 
-    void moveToNextLogSegment() {
+    void moveToNextLogSegment(final SegmentReader prevSegmentReader) {
         orderedSubmit(new CloseableRunnable() {
             @Override
             public void safeRun() {
-                unsafeMoveToNextLogSegment();
+                // Do not move forward if previous enqueued runnable
+                // already moved the segment forward.
+                if (prevSegmentReader == currentSegmentReader) {
+                    unsafeMoveToNextLogSegment();
+                }
             }
         });
     }
diff --git a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
index 548c4ca..43d1d84 100644
--- a/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
+++ b/stream/distributedlog/core/src/test/java/org/apache/distributedlog/TestBKDistributedLogManager.java
@@ -105,6 +105,56 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         setupCluster(numBookies);
     }
 
+    /**
+     * Test that DLM can reliably read data.
+     * Write multiple segments, read back multiple times.
+     * Make sure all entries from all segments are read back.
+     */
+    @Test(timeout = 120000)
+    public void testReadMultipleSegments() throws Exception {
+        String name = "distrlog-testReadMultipleSegments";
+
+        final int numSegments = 10;
+        final int numEntriesPerSegment = 2;
+        final int numReadIterations = 10;
+
+        BKDistributedLogManager dlm = createNewDLM(conf, name);
+        long txid = 1;
+        for (long i = 0; i < numSegments; i++) {
+            BKSyncLogWriter out = dlm.startLogSegmentNonPartitioned();
+            for (long j = 1; j <= numEntriesPerSegment; j++) {
+                LogRecord op = DLMTestUtil.getLogRecordInstance(txid++);
+                out.write(op);
+            }
+            out.closeAndComplete();
+            out = dlm.startLogSegmentNonPartitioned();
+            out.closeAndComplete();
+        }
+        assertEquals(txid - 1, dlm.getLastTxId());
+        dlm.close();
+
+        for (int runId = 0; runId < numReadIterations; runId++) {
+            dlm = createNewDLM(conf, name);
+            assertEquals(txid - 1, dlm.getLastTxId());
+
+            long numLogRecs = 0;
+            LogReader reader = dlm.getInputStream(1);
+            LogRecord record = reader.readNext(false);
+            while (null != record) {
+                numLogRecs++;
+                DLMTestUtil.verifyLogRecord(record);
+                assertEquals("(skipped txs in the middle) Failed at iteration " + runId,
+                        numLogRecs, record.getTransactionId());
+                record = reader.readNext(false);
+            }
+            reader.close();
+            dlm.close();
+
+            assertEquals("(missed txs at the end) Failed at iteration " + runId,
+                    txid - 1, numLogRecs);
+        }
+    }
+
     private void testNonPartitionedWritesInternal(String name, DistributedLogConfiguration conf) throws Exception {
         BKDistributedLogManager dlm = createNewDLM(conf, name);
 
@@ -305,6 +355,8 @@ public class TestBKDistributedLogManager extends TestDistributedLogBase {
         AsyncLogWriter writer1 = Utils.ioResult(manager.openAsyncLogWriter());
         Utils.ioResult(writer1.write(DLMTestUtil.getLogRecordInstance(1L)));
         Assert.assertEquals(1L, writer1.getLastTxId());
+        // some flaky ZK errors, Issue 3063
+        Thread.sleep(1000);
         AsyncLogWriter writer2 = Utils.ioResult(manager.openAsyncLogWriter());
         Utils.ioResult(writer2.write(DLMTestUtil.getLogRecordInstance(2L)));
         Assert.assertEquals(2L, writer2.getLastTxId());