You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/06/07 05:04:31 UTC

[pulsar] 01/17: [Transaction] Fix cursor readPosition is bigger than maxPosition in OpReadEntry (#14667)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit eed7825d8ee76cb77ab492ac30b981df1ce918ad
Author: congbo <39...@users.noreply.github.com>
AuthorDate: Mon Mar 14 16:51:54 2022 +0800

    [Transaction] Fix cursor readPosition is bigger than maxPosition in OpReadEntry (#14667)
    
    ### Motivation
    Fix cursor read op dead loop.
    ### Modifications
    1. in OpReadEntry we can't use cursor readPosition, because it is not the OpReadEntry readPosition, it is cursor readPosition when one ledger is empty the OpReadEntry readPosition is not equals cursor readPosition, we should use OpReadEntry readPosition to judge the OpReadEntry can be finished.
    2. when readPosition is bigger than maxPosition in OpReadEntry, we should complete this OpReadEntry
    ### Verifying this change
    add test
    
    (cherry picked from commit a242f03f69791fcfa1934b6cab20689393228381)
---
 .../bookkeeper/mledger/impl/OpReadEntry.java       |  3 +-
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 43 +++++++++++++++++++++-
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index 210ad31063a..178ee2dedab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -134,8 +134,9 @@ class OpReadEntry implements ReadEntriesCallback {
     }
 
     void checkReadCompletion() {
+        // op readPosition is smaller or equals maxPosition then can read again
         if (entries.size() < count && cursor.hasMoreEntries()
-                && ((PositionImpl) cursor.getReadPosition()).compareTo(maxPosition) < 0) {
+                && maxPosition.compareTo(readPosition) > 0) {
             // We still have more entries to read from the next ledger, schedule a new async operation
             if (nextReadPosition.getLedgerId() != readPosition.getLedgerId()) {
                 cursor.ledger.startReadOperationOnLedger(nextReadPosition, OpReadEntry.this);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index a2127cb7fd1..d64cac6bbd3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -30,7 +30,6 @@ import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.fail;
-
 import com.google.common.base.Charsets;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
@@ -59,7 +58,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import lombok.Cleanup;
@@ -3713,5 +3711,46 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         assertNotEquals(cursor.getCursorLedger(), initialLedgerId);
     }
 
+    @Test
+    public void testReadEmptyEntryList() throws Exception {
+        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
+        managedLedgerConfig.setMaxEntriesPerLedger(1);
+        managedLedgerConfig.setMetadataMaxEntriesPerLedger(1);
+        managedLedgerConfig.setMinimumRolloverTime(0, TimeUnit.MILLISECONDS);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory
+                .open("testReadEmptyEntryList", managedLedgerConfig);
+        ManagedCursorImpl cursor = (ManagedCursorImpl) ledger.openCursor("test");
+
+        PositionImpl lastPosition = (PositionImpl) ledger.addEntry("test".getBytes(Encoding));
+        ledger.rollCurrentLedgerIfFull();
+
+        AtomicBoolean flag = new AtomicBoolean();
+        flag.set(false);
+        ReadEntriesCallback callback = new ReadEntriesCallback() {
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                if (entries.size() == 0) {
+                    flag.set(true);
+                }
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+
+            }
+        };
+
+        // op readPosition is bigger than maxReadPosition
+        OpReadEntry opReadEntry = OpReadEntry.create(cursor, ledger.lastConfirmedEntry, 10, callback,
+                null, PositionImpl.get(lastPosition.getLedgerId(), -1));
+        Field field = ManagedCursorImpl.class.getDeclaredField("readPosition");
+        field.setAccessible(true);
+        field.set(cursor, PositionImpl.EARLIEST);
+        ledger.asyncReadEntries(opReadEntry);
+
+        // when readPosition is bigger than maxReadPosition, should complete the opReadEntry
+        Awaitility.await().untilAsserted(() -> assertTrue(flag.get()));
+    }
+
     private static final Logger log = LoggerFactory.getLogger(ManagedCursorTest.class);
 }