You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 14:20:34 UTC

[pulsar] branch branch-2.10 updated (6bf823eb412 -> 23b41cb6ee9)

This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a change to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 6bf823eb412 [Python Client] Fixed reserved keys is not removed when encode (#15844) (#15947)
     new 1b099d25016 Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)
     new 23b41cb6ee9 Fix cherry-pick issue of #15751: fix PulsarAdminToolTest#topics test

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  1 -
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 +++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 +++++++++++--------
 .../broker/service/BrokerEntryMetadataE2ETest.java |  7 ++--
 .../service/CurrentLedgerRolloverIfFullTest.java   | 41 ++++++++++++++++++++--
 .../pulsar/admin/cli/PulsarAdminToolTest.java      |  4 ---
 .../MLTransactionMetadataStoreTest.java            |  6 ++--
 7 files changed, 75 insertions(+), 33 deletions(-)


[pulsar] 01/02: Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 1b099d250164907d423dcf8517506c5611dabf9f
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Mar 16 21:43:13 2022 +0800

    Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)
    
    ### Motivation
    
    The original ledger creation design was lazy, meaning that the ledger was created when a new write operation was requested.
    
    https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L778-L786
    
    However, the current ``rollCurrentLedgerIfFull`` logic is to create a ledger regardless of the condition (even if ``ManagedLedgerImpl#pendingAddEntries`` is empty)
    
    https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1669-L1692
    
    Therefore, we need to change to lazy creation according to the original design.
    
    ### Modifications
    
    - Remove ``createLedgerAfterClosed`` invoke in the ``rollCurrentLedgerIfFull`` method.
    
    (cherry picked from commit 30f7f004a2397a6eeb88e74df44fdb16a9b3403e)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  1 -
 .../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 +++----
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 +++++++++++--------
 .../broker/service/BrokerEntryMetadataE2ETest.java |  7 ++--
 .../service/CurrentLedgerRolloverIfFullTest.java   | 41 ++++++++++++++++++++--
 .../MLTransactionMetadataStoreTest.java            |  6 ++--
 6 files changed, 75 insertions(+), 29 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 87f97ca8329..631d96101bf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,7 +1689,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
 
                     ledgerClosed(lh);
-                    createLedgerAfterClosed();
                 }
             }, System.nanoTime());
         }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d64cac6bbd3..c3aec218785 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2235,15 +2235,13 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
         ledger.addEntry("fourth".getBytes(Encoding));
         Position last = ledger.addEntry("last-expired".getBytes(Encoding));
 
-        // roll a new ledger
-        int numLedgersBefore = ledger.getLedgersInfo().size();
         ledger.getConfig().setMaxEntriesPerLedger(1);
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
+        // roll a new ledger
         ledger.rollCurrentLedgerIfFull();
-        Awaitility.await().atMost(20, TimeUnit.SECONDS)
-                .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
+        Awaitility.await().untilAsserted(() -> {
+            Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
+            Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
 
         // the algorithm looks for "expired" messages
         // starting from the first, then it moves to the last message
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8f6fdfe6640..e0029fc56b2 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertNull;
 import static org.testng.Assert.assertSame;
@@ -67,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import lombok.Cleanup;
@@ -1811,7 +1813,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.addEntry("data".getBytes());
         
         Awaitility.await().untilAsserted(() -> {
-            assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+            assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+            assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
         });   
     }
 
@@ -1958,7 +1961,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
         ManagedLedgerConfig config = new ManagedLedgerConfig();
         config.setRetentionSizeInMB(0);
-        config.setMaxEntriesPerLedger(1);
+        config.setMaxEntriesPerLedger(2);
         config.setRetentionTime(1, TimeUnit.SECONDS);
         config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
 
@@ -1968,19 +1971,25 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
         c1.skipEntries(1, IndividualDeletedEntries.Exclude);
         c2.skipEntries(1, IndividualDeletedEntries.Exclude);
-        // let current ledger close
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
+        long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
+        ml.pendingAddEntries.add(OpAddEntry.
+                        createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
         ml.rollCurrentLedgerIfFull();
+        AtomicLong currentLedgerId = new AtomicLong(-1);
+        // create a new ledger
+        Awaitility.await().untilAsserted(() -> {
+            currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
+            assertNotEquals(preLedgerId, currentLedgerId.get());
+        });
         // let retention expire
         Thread.sleep(1500);
         // delete the expired ledger
         ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
 
         // the closed and expired ledger should be deleted
-        assertTrue(ml.getLedgersInfoAsList().size() <= 1);
-        assertEquals(ml.getTotalSize(), 0);
+        assertEquals(ml.getLedgersInfoAsList().size(), 1);
+        assertEquals(currentLedgerId.get(),
+                ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
         ml.close();
     }
 
@@ -2245,10 +2254,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         stateUpdater.setAccessible(true);
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
-        Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(managedLedger.getLedgersInfo().size(), 2);
+            assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+        });
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
         assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
-        assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
         log.info("### ledgers {}", managedLedger.getLedgersInfo());
 
         long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -3142,12 +3153,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         // all the messages have benn acknowledged
         // and all the ledgers have been removed except the last ledger
-        Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
-        stateUpdater.setAccessible(true);
-        stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
         ledger.rollCurrentLedgerIfFull();
         Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
-        Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
+        Awaitility.await().untilAsserted(() ->
+                Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
     }
 
     @Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 49b4742b71d..3356f0c5178 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,10 +391,13 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
         managedLedger.rollCurrentLedgerIfFull();
 
         Awaitility.await().atMost(Duration.ofSeconds(3))
-                .until(() -> managedLedger.getLedgersInfo().size() > 1);
+                .untilAsserted(() -> {
+                    Assert.assertEquals(managedLedger.getLedgersInfo().size(), 1);
+                    Assert.assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+                });
 
         final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
-        Assert.assertEquals(ledgerInfoList.size(), 2);
+        Assert.assertEquals(ledgerInfoList.size(), 1);
         Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
 
         cursor.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index b05abf3be52..f1d7a609a79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,10 +20,14 @@ package org.apache.pulsar.broker.service;
 
 import java.lang.reflect.Field;
 import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBufAllocator;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.client.api.Consumer;
 import org.apache.pulsar.client.api.Message;
@@ -89,8 +93,10 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
             consumer.acknowledge(msg);
         }
 
+        MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
+                managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size() - 1);
         // all the messages have been acknowledged
-        // and all the ledgers have been removed except the the last ledger
+        // and all the ledgers have been removed except the last ledger
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(500L))
                 .untilAsserted(() -> {
@@ -104,12 +110,41 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
         stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
         managedLedger.rollCurrentLedgerIfFull();
 
-        // the last ledger will be closed and removed and we have one ledger for empty
+        // If there are no pending write messages, the last ledger will be closed and still held.
         Awaitility.await()
                 .pollInterval(Duration.ofMillis(1000L))
                 .untilAsserted(() -> {
                     Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
-                    Assert.assertEquals(managedLedger.getTotalSize(), 0);
+                    Assert.assertEquals(lastLh.getLedgerId(),
+                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+                });
+        producer.send(new byte[1024 * 1024]);
+        Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+        Assert.assertNotNull(msg);
+        consumer.acknowledge(msg);
+        // Assert that we got a new ledger and all but the current ledger are deleted
+        Awaitility.await()
+                .untilAsserted(()-> {
+                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+                    Assert.assertNotEquals(lastLh.getLedgerId(),
+                            managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+                });
+        long lastLhIdAfterRolloverAndSendAgain = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
+
+        // Mock pendingAddEntries
+        OpAddEntry op = OpAddEntry.
+                createNoRetainBuffer(managedLedger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+        Field pendingAddEntries = managedLedger.getClass().getDeclaredField("pendingAddEntries");
+        pendingAddEntries.setAccessible(true);
+        ConcurrentLinkedQueue<OpAddEntry> queue = (ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
+        queue.add(op);
+        // When ml has pending write messages, ml will create a new ledger and close and delete the previous ledger
+        Awaitility.await()
+                .untilAsserted(()-> {
+                    managedLedger.rollCurrentLedgerIfFull();
+                    Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+                    Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
+                            lastLhIdAfterRolloverAndSendAgain);
                 });
     }
 }
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index a06bf9e6dea..2aa678059d4 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
 import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
@@ -168,8 +169,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
             stateUpdater.setAccessible(true);
             stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
             managedLedger.rollCurrentLedgerIfFull();
-            Awaitility.await().until(() -> {
-                return !managedLedger.ledgerExists(position.getLedgerId());
+            Awaitility.await().untilAsserted(() -> {
+                Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
+                Assert.assertEquals(managedLedger.getState(), ClosedLedger);
             });
         }
         mlTransactionLog.closeAsync().get();


[pulsar] 02/02: Fix cherry-pick issue of #15751: fix PulsarAdminToolTest#topics test

Posted by ni...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 23b41cb6ee9a8c5fb35cda5540c12a435ab8090f
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 16:20:14 2022 +0200

    Fix cherry-pick issue of #15751: fix PulsarAdminToolTest#topics test
---
 .../test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java    | 4 ----
 1 file changed, 4 deletions(-)

diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index ca129e54971..8e7e339b536 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1418,10 +1418,6 @@ public class PulsarAdminToolTest {
         props.put("a", "b");
         verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, false, props);
 
-        cmdTopics = new CmdTopics(() -> admin);
-        cmdTopics.run(split("create-subscription persistent://myprop/clust/ns1/ds1 -s sub1 --messageId earliest -r"));
-        verify(mockTopics).createSubscription("persistent://myprop/clust/ns1/ds1", "sub1", MessageId.earliest, true, null);
-
         cmdTopics = new CmdTopics(() -> admin);
         cmdTopics.run(split("update-subscription-properties persistent://myprop/clust/ns1/ds1 -s sub1 --clear"));
         verify(mockTopics).updateSubscriptionProperties("persistent://myprop/clust/ns1/ds1", "sub1", new HashMap<>());