You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 14:23:19 UTC

[pulsar] branch branch-2.10 updated: Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new f69c30f3f34 Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"
f69c30f3f34 is described below

commit f69c30f3f3433cf7d0b0399f5665c600ba560e22
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 16:23:04 2022 +0200

    Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"
    
    This reverts commit 1b099d250164907d423dcf8517506c5611dabf9f.
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  1 +
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 ++++---
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 ++++++++-----------
 .../broker/service/BrokerEntryMetadataE2ETest.java |  7 ++--
 .../service/CurrentLedgerRolloverIfFullTest.java   | 41 ++--------------------
 .../MLTransactionMetadataStoreTest.java            |  6 ++--
 6 files changed, 29 insertions(+), 75 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 631d96101bf..87f97ca8329 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,6 +1689,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
 
                     ledgerClosed(lh);
+                    createLedgerAfterClosed();
                 }
             }, System.nanoTime());
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c3aec218785..d64cac6bbd3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2235,13 +2235,15 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger.addEntry("fourth".getBytes(Encoding));
         Position last = ledger.addEntry("last-expired".getBytes(Encoding));
 
-        ledger.getConfig().setMaxEntriesPerLedger(1);
         // roll a new ledger
+        int numLedgersBefore = ledger.getLedgersInfo().size();
+        ledger.getConfig().setMaxEntriesPerLedger(1);
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
-        Awaitility.await().untilAsserted(() -> {
-            Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
-            Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
-        });
+        Awaitility.await().atMost(20, TimeUnit.SECONDS)
+                .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
 
         // the algorithm looks for "expired" messages
         // starting from the first, then it moves to the last message
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e0029fc56b2..8f6fdfe6640 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
@@ -68,7 +67,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import lombok.Cleanup;
@@ -1813,8 +1811,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.addEntry("data".getBytes());
         
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.getLedgersInfoAsList().size(), 1);
-            assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+            assertEquals(ledger.getLedgersInfoAsList().size(), 2);
         });   
     }
 
@@ -1961,7 +1958,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionSizeInMB(0);
-        config.setMaxEntriesPerLedger(2);
+        config.setMaxEntriesPerLedger(1);
         config.setRetentionTime(1, TimeUnit.SECONDS);
         config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
 
@@ -1971,25 +1968,19 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         c2.skipEntries(1, IndividualDeletedEntries.Exclude);
-        long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
-        ml.pendingAddEntries.add(OpAddEntry.
-                        createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
+        // let current ledger close
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
         ml.rollCurrentLedgerIfFull();
-        AtomicLong currentLedgerId = new AtomicLong(-1);
-        // create a new ledger
-        Awaitility.await().untilAsserted(() -> {
-            currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
-            assertNotEquals(preLedgerId, currentLedgerId.get());
-        });
         // let retention expire
         Thread.sleep(1500);
         // delete the expired ledger
         ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
 
         // the closed and expired ledger should be deleted
-        assertEquals(ml.getLedgersInfoAsList().size(), 1);
-        assertEquals(currentLedgerId.get(),
-                ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
+        assertTrue(ml.getLedgersInfoAsList().size() <= 1);
+        assertEquals(ml.getTotalSize(), 0);
         ml.close();
     }
 
@@ -2254,12 +2245,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.setAccessible(true);
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
-        Awaitility.await().untilAsserted(() -> {
-            assertEquals(managedLedger.getLedgersInfo().size(), 2);
-            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
-        });
+        Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
+        assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
         log.info("### ledgers {}", managedLedger.getLedgersInfo());
 
         long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -3153,10 +3142,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // all the messages have benn acknowledged
         // and all the ledgers have been removed except the last ledger
+        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+        stateUpdater.setAccessible(true);
+        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
-        Awaitility.await().untilAsserted(() ->
-                Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
+        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 3356f0c5178..49b4742b71d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,13 +391,10 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
         managedLedger.rollCurrentLedgerIfFull();
 
         Awaitility.await().atMost(Duration.ofSeconds(3))
-                .untilAsserted(() -> {
-                    Assert.assertEquals(managedLedger.getLedgersInfo().size(), 1);
-                    Assert.assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
-                });
+                .until(() -> managedLedger.getLedgersInfo().size() > 1);
 
         final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
-        Assert.assertEquals(ledgerInfoList.size(), 1);
+        Assert.assertEquals(ledgerInfoList.size(), 2);
         Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
 
         cursor.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index f1d7a609a79..b05abf3be52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,14 +20,10 @@ package org.apache.pulsar.broker.service;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
-import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBufAllocator;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.OpAddEntry;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -93,10 +89,8 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
             consumer.acknowledge(msg);
         }
 
-        MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
-                managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size() - 1);
         // all the messages have been acknowledged
-        // and all the ledgers have been removed except the last ledger
+        // and all the ledgers have been removed except the the last ledger
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(500L))
                 .untilAsserted(() -> {
@@ -110,41 +104,12 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
 
-        // If there are no pending write messages, the last ledger will be closed and still held.
+        // the last ledger will be closed and removed and we have one ledger for empty
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(1000L))
                 .untilAsserted(() -> {
                     Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-                    Assert.assertEquals(lastLh.getLedgerId(),
-                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
-                });
-        producer.send(new byte[1024 * 1024]);
-        Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
-        Assert.assertNotNull(msg);
-        consumer.acknowledge(msg);
-        // Assert that we got a new ledger and all but the current ledger are deleted
-        Awaitility.await()
-                .untilAsserted(()-> {
-                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-                    Assert.assertNotEquals(lastLh.getLedgerId(),
-                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
-                });
-        long lastLhIdAfterRolloverAndSendAgain = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
-
-        // Mock pendingAddEntries
-        OpAddEntry op = OpAddEntry.
-                createNoRetainBuffer(managedLedger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
-        Field pendingAddEntries = managedLedger.getClass().getDeclaredField("pendingAddEntries");
-        pendingAddEntries.setAccessible(true);
-        ConcurrentLinkedQueue<OpAddEntry> queue = (ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
-        queue.add(op);
-        // When ml has pending write messages, ml will create a new ledger and close and delete the previous ledger
-        Awaitility.await()
-                .untilAsserted(()-> {
-                    managedLedger.rollCurrentLedgerIfFull();
-                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-                    Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
-                            lastLhIdAfterRolloverAndSendAgain);
+                    Assert.assertEquals(managedLedger.getTotalSize(), 0);
                 });
     }
 }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 2aa678059d4..a06bf9e6dea 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -169,9 +168,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
             stateUpdater.setAccessible(true);
             stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
             managedLedger.rollCurrentLedgerIfFull();
-            Awaitility.await().untilAsserted(() -> {
-                Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
-                Assert.assertEquals(managedLedger.getState(), ClosedLedger);
+            Awaitility.await().until(() -> {
+                return !managedLedger.ledgerExists(position.getLedgerId());
             });
         }
         mlTransactionLog.closeAsync().get();