You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 14:20:35 UTC
[pulsar] 01/02: Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 1b099d250164907d423dcf8517506c5611dabf9f
Author: Qiang Zhao <74...@users.noreply.github.com>
AuthorDate: Wed Mar 16 21:43:13 2022 +0800
Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)
### Motivation
The original ledger creation design was lazy, meaning that the ledger was created when a new write operation was requested.
https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L778-L786
However, the current ``rollCurrentLedgerIfFull`` logic is to create a ledger regardless of the condition (even if ``ManagedLedgerImpl#pendingAddEntries`` is empty)
https://github.com/apache/pulsar/blob/ad2cc2d38280b7dd0f056ee981ec8d3b157e3526/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L1669-L1692
Therefore, we need to change to lazy creation according to the original design.
### Modifications
- Remove ``createLedgerAfterClosed`` invoke in the ``rollCurrentLedgerIfFull`` method.
(cherry picked from commit 30f7f004a2397a6eeb88e74df44fdb16a9b3403e)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 -
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 +++----
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 +++++++++++--------
.../broker/service/BrokerEntryMetadataE2ETest.java | 7 ++--
.../service/CurrentLedgerRolloverIfFullTest.java | 41 ++++++++++++++++++++--
.../MLTransactionMetadataStoreTest.java | 6 ++--
6 files changed, 75 insertions(+), 29 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 87f97ca8329..631d96101bf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,7 +1689,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
ledgerClosed(lh);
- createLedgerAfterClosed();
}
}, System.nanoTime());
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index d64cac6bbd3..c3aec218785 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2235,15 +2235,13 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger.addEntry("fourth".getBytes(Encoding));
Position last = ledger.addEntry("last-expired".getBytes(Encoding));
- // roll a new ledger
- int numLedgersBefore = ledger.getLedgersInfo().size();
ledger.getConfig().setMaxEntriesPerLedger(1);
- Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
- stateUpdater.setAccessible(true);
- stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
+ // roll a new ledger
ledger.rollCurrentLedgerIfFull();
- Awaitility.await().atMost(20, TimeUnit.SECONDS)
- .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
+ Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+ });
// the algorithm looks for "expired" messages
// starting from the first, then it moves to the last message
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 8f6fdfe6640..e0029fc56b2 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,6 +31,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
@@ -67,6 +68,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
@@ -1811,7 +1813,8 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.addEntry("data".getBytes());
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.getLedgersInfoAsList().size(), 2);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 1);
+ assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
});
}
@@ -1958,7 +1961,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(0);
- config.setMaxEntriesPerLedger(1);
+ config.setMaxEntriesPerLedger(2);
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
@@ -1968,19 +1971,25 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
- // let current ledger close
- Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
- stateUpdater.setAccessible(true);
- stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
+ long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
+ ml.pendingAddEntries.add(OpAddEntry.
+ createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
ml.rollCurrentLedgerIfFull();
+ AtomicLong currentLedgerId = new AtomicLong(-1);
+ // create a new ledger
+ Awaitility.await().untilAsserted(() -> {
+ currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
+ assertNotEquals(preLedgerId, currentLedgerId.get());
+ });
// let retention expire
Thread.sleep(1500);
// delete the expired ledger
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
// the closed and expired ledger should be deleted
- assertTrue(ml.getLedgersInfoAsList().size() <= 1);
- assertEquals(ml.getTotalSize(), 0);
+ assertEquals(ml.getLedgersInfoAsList().size(), 1);
+ assertEquals(currentLedgerId.get(),
+ ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
ml.close();
}
@@ -2245,10 +2254,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(managedLedger.getLedgersInfo().size(), 2);
+ assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+ });
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
- assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
log.info("### ledgers {}", managedLedger.getLedgersInfo());
long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -3142,12 +3153,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
- Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
- stateUpdater.setAccessible(true);
- stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
- Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
+ Awaitility.await().untilAsserted(() ->
+ Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 49b4742b71d..3356f0c5178 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,10 +391,13 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(Duration.ofSeconds(3))
- .until(() -> managedLedger.getLedgersInfo().size() > 1);
+ .untilAsserted(() -> {
+ Assert.assertEquals(managedLedger.getLedgersInfo().size(), 1);
+ Assert.assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+ });
final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
- Assert.assertEquals(ledgerInfoList.size(), 2);
+ Assert.assertEquals(ledgerInfoList.size(), 1);
Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
cursor.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index b05abf3be52..f1d7a609a79 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,10 +20,14 @@ package org.apache.pulsar.broker.service;
import java.lang.reflect.Field;
import java.time.Duration;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
+import org.apache.bookkeeper.mledger.impl.OpAddEntry;
+import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -89,8 +93,10 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
consumer.acknowledge(msg);
}
+ MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
+ managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size() - 1);
// all the messages have been acknowledged
- // and all the ledgers have been removed except the the last ledger
+ // and all the ledgers have been removed except the last ledger
Awaitility.await()
.pollInterval(Duration.ofMillis(500L))
.untilAsserted(() -> {
@@ -104,12 +110,41 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- // the last ledger will be closed and removed and we have one ledger for empty
+ // If there are no pending write messages, the last ledger will be closed and still held.
Awaitility.await()
.pollInterval(Duration.ofMillis(1000L))
.untilAsserted(() -> {
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertEquals(managedLedger.getTotalSize(), 0);
+ Assert.assertEquals(lastLh.getLedgerId(),
+ managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+ });
+ producer.send(new byte[1024 * 1024]);
+ Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
+ Assert.assertNotNull(msg);
+ consumer.acknowledge(msg);
+ // Assert that we got a new ledger and all but the current ledger are deleted
+ Awaitility.await()
+ .untilAsserted(()-> {
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+ Assert.assertNotEquals(lastLh.getLedgerId(),
+ managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
+ });
+ long lastLhIdAfterRolloverAndSendAgain = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
+
+ // Mock pendingAddEntries
+ OpAddEntry op = OpAddEntry.
+ createNoRetainBuffer(managedLedger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
+ Field pendingAddEntries = managedLedger.getClass().getDeclaredField("pendingAddEntries");
+ pendingAddEntries.setAccessible(true);
+ ConcurrentLinkedQueue<OpAddEntry> queue = (ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
+ queue.add(op);
+ // When ml has pending write messages, ml will create a new ledger and close and delete the previous ledger
+ Awaitility.await()
+ .untilAsserted(()-> {
+ managedLedger.rollCurrentLedgerIfFull();
+ Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
+ Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
+ lastLhIdAfterRolloverAndSendAgain);
});
}
}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index a06bf9e6dea..2aa678059d4 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -47,6 +47,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -168,8 +169,9 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().until(() -> {
- return !managedLedger.ledgerExists(position.getLedgerId());
+ Awaitility.await().untilAsserted(() -> {
+ Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
+ Assert.assertEquals(managedLedger.getState(), ClosedLedger);
});
}
mlTransactionLog.closeAsync().get();