You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 14:20:35 UTC

[pulsar] 01/02: Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b099d250164907d423dcf8517506c5611dabf9f
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Mar 16 21:43:13 2022 +0800

    Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)
    
    ### Motivation
    
    The original ledger creation design was lazy, meaning that the ledger was created when a new write operation was requested.
    
    https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L778-L786
    
    However, the current ``rollCurrentLedgerIfFull`` logic is to create a ledger regardless of the condition (even if ``ManagedLedgerImpl#pendingAddEntries`` is empty)
    
    https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1669-L1692
    
    Therefore, we need to change to lazy creation according to the original design.
    
    ### Modifications
    
    - Remove ``createLedgerAfterClosed`` invoke in the ``rollCurrentLedgerIfFull`` method.
    
    (cherry picked from commit 30f7f004a2397a6eeb88e74df44fdb16a9b3403e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  1 -
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 +++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 +++++++++++--------
 .../broker/service/BrokerEntryMetadataE2ETest.java |  7 ++--
 .../service/CurrentLedgerRolloverIfFullTest.java   | 41 ++++++++++++++++++++--
 .../MLTransactionMetadataStoreTest.java            |  6 ++--
 6 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 87f97ca8329..631d96101bf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,7 +1689,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
 
                     ledgerClosed(lh);
-                    createLedgerAfterClosed();
                 }
             }, System.nanoTime());
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d64cac6bbd3..c3aec218785 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2235,15 +2235,13 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger.addEntry("fourth".getBytes(Encoding));
         Position last = ledger.addEntry("last-expired".getBytes(Encoding));
 
-        // roll a new ledger
-        int numLedgersBefore = ledger.getLedgersInfo().size();
         ledger.getConfig().setMaxEntriesPerLedger(1);
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
+        // roll a new ledger
         ledger.rollCurrentLedgerIfFull();
-        Awaitility.await().atMost(20, TimeUnit.SECONDS)
-                .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
+            Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
 
         // the algorithm looks for "expired" messages
         // starting from the first, then it moves to the last message
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8f6fdfe6640..e0029fc56b2 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
@@ -67,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import lombok.Cleanup;
@@ -1811,7 +1813,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.addEntry("data".getBytes());
         
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+            assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+            assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
         });   
     }
 
@@ -1958,7 +1961,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionSizeInMB(0);
-        config.setMaxEntriesPerLedger(1);
+        config.setMaxEntriesPerLedger(2);
         config.setRetentionTime(1, TimeUnit.SECONDS);
         config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
 
@@ -1968,19 +1971,25 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         c2.skipEntries(1, IndividualDeletedEntries.Exclude);
-        // let current ledger close
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
+        long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
+        ml.pendingAddEntries.add(OpAddEntry.
+                        createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
         ml.rollCurrentLedgerIfFull();
+        AtomicLong currentLedgerId = new AtomicLong(-1);
+        // create a new ledger
+        Awaitility.await().untilAsserted(() -> {
+            currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
+            assertNotEquals(preLedgerId, currentLedgerId.get());
+        });
         // let retention expire
         Thread.sleep(1500);
         // delete the expired ledger
         ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
 
         // the closed and expired ledger should be deleted
-        assertTrue(ml.getLedgersInfoAsList().size() <= 1);
-        assertEquals(ml.getTotalSize(), 0);
+        assertEquals(ml.getLedgersInfoAsList().size(), 1);
+        assertEquals(currentLedgerId.get(),
+                ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
         ml.close();
     }
 
@@ -2245,10 +2254,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.setAccessible(true);
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
-        Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 2);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
-        assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
         log.info("### ledgers {}", managedLedger.getLedgersInfo());
 
         long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -3142,12 +3153,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // all the messages have benn acknowledged
         // and all the ledgers have been removed except the last ledger
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
-        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 49b4742b71d..3356f0c5178 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,10 +391,13 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
         managedLedger.rollCurrentLedgerIfFull();
 
         Awaitility.await().atMost(Duration.ofSeconds(3))
-                .until(() -> managedLedger.getLedgersInfo().size() > 1);
+                .untilAsserted(() -> {
+                    Assert.assertEquals(managedLedger.getLedgersInfo().size(), 1);
+                    Assert.assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+                });
 
         final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
-        Assert.assertEquals(ledgerInfoList.size(), 2);
+        Assert.assertEquals(ledgerInfoList.size(), 1);
         Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
 
         cursor.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index b05abf3be52..f1d7a609a79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,10 +20,14 @@ package org.apache.pulsar.broker.service;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBufAllocator;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -89,8 +93,10 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
             consumer.acknowledge(msg);
         }
 
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
+                managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size() - 1);
         // all the messages have been acknowledged
-        // and all the ledgers have been removed except the the last ledger
+        // and all the ledgers have been removed except the last ledger
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(500L))
                 .untilAsserted(() -> {
@@ -104,12 +110,41 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
 
-        // the last ledger will be closed and removed and we have one ledger for empty
+        // If there are no pending write messages, the last ledger will be closed and still held.
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(1000L))
                 .untilAsserted(() -> {
                     Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-                    Assert.assertEquals(managedLedger.getTotalSize(), 0);
+                    Assert.assertEquals(lastLh.getLedgerId(),
+                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+                });
+        producer.send(new byte[1024 * 1024]);
+        Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        consumer.acknowledge(msg);
+        // Assert that we got a new ledger and all but the current ledger are deleted
+        Awaitility.await()
+                .untilAsserted(()-> {
+                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+                    Assert.assertNotEquals(lastLh.getLedgerId(),
+                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+                });
+        long lastLhIdAfterRolloverAndSendAgain = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
+
+        // Mock pendingAddEntries
+        OpAddEntry op = OpAddEntry.
+                createNoRetainBuffer(managedLedger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+        Field pendingAddEntries = managedLedger.getClass().getDeclaredField("pendingAddEntries");
+        pendingAddEntries.setAccessible(true);
+        ConcurrentLinkedQueue<OpAddEntry> queue = (ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
+        queue.add(op);
+        // When ml has pending write messages, ml will create a new ledger and close and delete the previous ledger
+        Awaitility.await()
+                .untilAsserted(()-> {
+                    managedLedger.rollCurrentLedgerIfFull();
+                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+                    Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
+                            lastLhIdAfterRolloverAndSendAgain);
                 });
     }
 }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index a06bf9e6dea..2aa678059d4 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -168,8 +169,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
             stateUpdater.setAccessible(true);
             stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
             managedLedger.rollCurrentLedgerIfFull();
-            Awaitility.await().until(() -> {
-                return !managedLedger.ledgerExists(position.getLedgerId());
+            Awaitility.await().untilAsserted(() -> {
+                Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
+                Assert.assertEquals(managedLedger.getState(), ClosedLedger);
             });
         }
         mlTransactionLog.closeAsync().get();