You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 06:29:19 UTC

[pulsar] branch branch-2.10 updated: Fix lost message issue due to ledger rollover. (#14664)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 93e15d9  Fix lost message issue due to ledger rollover. (#14664)
93e15d9 is described below

commit 93e15d93305eebd529bdf72d2d37689515de28d2
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Sat Mar 12 14:24:38 2022 +0800

    Fix lost message issue due to ledger rollover. (#14664)
    
    (cherry picked from commit ad2cc2d38280b7dd0f056ee981ec8d3b157e3526)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 ++---
 .../bookkeeper/mledger/impl/ManagedCursorTest.java |  3 ++
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 38 +++++++++++++++++++---
 .../service/CurrentLedgerRolloverIfFullTest.java   |  4 +++
 .../MLTransactionMetadataStoreTest.java            |  5 ++-
 5 files changed, 48 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 5334544..7c351c4 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -774,8 +774,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
         } else if (state == State.ClosedLedger) {
             // No ledger and no pending operations. Create a new ledger
-            log.info("[{}] Creating a new ledger", name);
             if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) {
+                log.info("[{}] Creating a new ledger", name);
                 this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
                 mbean.startDataLedgerCreateOp();
                 asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap());
@@ -1644,7 +1644,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     synchronized void createLedgerAfterClosed() {
         if (isNeededCreateNewLedgerAfterCloseLedger()) {
-            log.info("[{}] Creating a new ledger", name);
+            log.info("[{}] Creating a new ledger after closed", name);
             STATE_UPDATER.set(this, State.CreatingLedger);
             this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis();
             mbean.startDataLedgerCreateOp();
@@ -1667,8 +1667,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     @Override
     public void rollCurrentLedgerIfFull() {
         log.info("[{}] Start checking if current ledger is full", name);
-        if (currentLedgerEntries > 0 && currentLedgerIsFull()) {
-            STATE_UPDATER.set(this, State.ClosingLedger);
+        if (currentLedgerEntries > 0 && currentLedgerIsFull()
+                && STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) {
             currentLedger.asyncClose(new AsyncCallback.CloseCallback() {
                 @Override
                 public void closeComplete(int rc, LedgerHandle lh, Object o) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index 25057d1..a2127cb 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2240,6 +2240,9 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         // roll a new ledger
         int numLedgersBefore = ledger.getLedgersInfo().size();
         ledger.getConfig().setMaxEntriesPerLedger(1);
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
         Awaitility.await().atMost(20, TimeUnit.SECONDS)
                 .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index c6008c76..c055658 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -1969,6 +1969,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         c2.skipEntries(1, IndividualDeletedEntries.Exclude);
         // let current ledger close
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
         ml.rollCurrentLedgerIfFull();
         // let retention expire
         Thread.sleep(1500);
@@ -2238,6 +2241,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         managedCursor.markDelete(positionMarkDelete);
 
         //trigger ledger rollover and wait for the new ledger created
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
@@ -3096,7 +3102,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             ledger.addEntry(new byte[1024 * 1024]);
         }
 
-        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2);
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2));
         List<Entry> entries = cursor.readEntries(msgNum);
         Assert.assertEquals(msgNum, entries.size());
 
@@ -3107,9 +3113,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // all the messages have benn acknowledged
         // and all the ledgers have been removed except the last ledger
-        Thread.sleep(1000);
-        Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1);
-        Assert.assertEquals(ledger.getTotalSize(), 0);
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
+        ledger.rollCurrentLedgerIfFull();
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
     }
 
     @Test
@@ -3128,6 +3137,26 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
     }
 
     @Test
+    public void testLedgerNotRolloverWithoutOpenState() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+
+        ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config));
+        ml.addEntry("test1".getBytes()).getLedgerId();
+        long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId();
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        // Set state to CreatingLedger to avoid rollover
+        stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger);
+        ml.rollCurrentLedgerIfFull();
+        Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger");
+        currentLedger.setAccessible(true);
+        LedgerHandle lh = (LedgerHandle) currentLedger.get(ml);
+        Awaitility.await()
+                .until(() -> ledgerId2 == lh.getId());
+    }
+
+    @Test
     public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionTime(1, TimeUnit.SECONDS);
@@ -3488,5 +3517,4 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
             Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
         });
     }
-
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index 77ec229..b05abf3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.broker.service;
 
+import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
@@ -98,6 +99,9 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
                         });
 
         // trigger a ledger rollover
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
 
         // the last ledger will be closed and removed and we have one ledger for empty
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 7fa3c08..a06bf9e 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -164,8 +164,11 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog);
         Position position = managedLedger.getLastConfirmedEntry();
         if (isUseManagedLedgerProperties) {
+            Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+            stateUpdater.setAccessible(true);
+            stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
+            managedLedger.rollCurrentLedgerIfFull();
             Awaitility.await().until(() -> {
-                managedLedger.rollCurrentLedgerIfFull();
                 return !managedLedger.ledgerExists(position.getLedgerId());
             });
         }