You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ni...@apache.org on 2022/06/08 14:23:19 UTC
[pulsar] branch branch-2.10 updated: Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"
This is an automated email from the ASF dual-hosted git repository.
nicoloboschi pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new f69c30f3f34 Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"
f69c30f3f34 is described below
commit f69c30f3f3433cf7d0b0399f5665c600ba560e22
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Wed Jun 8 16:23:04 2022 +0200
Revert "Change ``rollCurrentLedgerIfFull`` logic to follow lazy creation of ledger (#14672)"
This reverts commit 1b099d250164907d423dcf8517506c5611dabf9f.
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 1 +
.../bookkeeper/mledger/impl/ManagedCursorTest.java | 12 ++++---
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 37 ++++++++-----------
.../broker/service/BrokerEntryMetadataE2ETest.java | 7 ++--
.../service/CurrentLedgerRolloverIfFullTest.java | 41 ++--------------------
.../MLTransactionMetadataStoreTest.java | 6 ++--
6 files changed, 29 insertions(+), 75 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 631d96101bf..87f97ca8329 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1689,6 +1689,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
ledgerClosed(lh);
+ createLedgerAfterClosed();
}
}, System.nanoTime());
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
index c3aec218785..d64cac6bbd3 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java
@@ -2235,13 +2235,15 @@ public class ManagedCursorTest extends MockedBookKeeperTestCase {
ledger.addEntry("fourth".getBytes(Encoding));
Position last = ledger.addEntry("last-expired".getBytes(Encoding));
- ledger.getConfig().setMaxEntriesPerLedger(1);
// roll a new ledger
+ int numLedgersBefore = ledger.getLedgersInfo().size();
+ ledger.getConfig().setMaxEntriesPerLedger(1);
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
- Assert.assertEquals(ledger.getLedgersInfo().size(), 1);
- Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
- });
+ Awaitility.await().atMost(20, TimeUnit.SECONDS)
+ .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore);
// the algorithm looks for "expired" messages
// starting from the first, then it moves to the last message
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index e0029fc56b2..8f6fdfe6640 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -31,7 +31,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertSame;
@@ -68,7 +67,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import lombok.Cleanup;
@@ -1813,8 +1811,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.addEntry("data".getBytes());
Awaitility.await().untilAsserted(() -> {
- assertEquals(ledger.getLedgersInfoAsList().size(), 1);
- assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger);
+ assertEquals(ledger.getLedgersInfoAsList().size(), 2);
});
}
@@ -1961,7 +1958,7 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
ManagedLedgerConfig config = new ManagedLedgerConfig();
config.setRetentionSizeInMB(0);
- config.setMaxEntriesPerLedger(2);
+ config.setMaxEntriesPerLedger(1);
config.setRetentionTime(1, TimeUnit.SECONDS);
config.setMaximumRolloverTime(1, TimeUnit.SECONDS);
@@ -1971,25 +1968,19 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ml.addEntry("iamaverylongmessagethatshouldnotberetained".getBytes());
c1.skipEntries(1, IndividualDeletedEntries.Exclude);
c2.skipEntries(1, IndividualDeletedEntries.Exclude);
- long preLedgerId = ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId();
- ml.pendingAddEntries.add(OpAddEntry.
- createNoRetainBuffer(ml, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null));
+ // let current ledger close
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened);
ml.rollCurrentLedgerIfFull();
- AtomicLong currentLedgerId = new AtomicLong(-1);
- // create a new ledger
- Awaitility.await().untilAsserted(() -> {
- currentLedgerId.set(ml.getLedgersInfoAsList().get(ml.ledgers.size() -1).getLedgerId());
- assertNotEquals(preLedgerId, currentLedgerId.get());
- });
// let retention expire
Thread.sleep(1500);
// delete the expired ledger
ml.internalTrimConsumedLedgers(CompletableFuture.completedFuture(null));
// the closed and expired ledger should be deleted
- assertEquals(ml.getLedgersInfoAsList().size(), 1);
- assertEquals(currentLedgerId.get(),
- ml.getLedgersInfoAsList().get(ml.getLedgersInfoAsList().size() - 1).getLedgerId());
+ assertTrue(ml.getLedgersInfoAsList().size() <= 1);
+ assertEquals(ml.getTotalSize(), 0);
ml.close();
}
@@ -2254,12 +2245,10 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
- assertEquals(managedLedger.getLedgersInfo().size(), 2);
- assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
- });
+ Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3));
assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries());
assertEquals(5, managedLedger.getLedgersInfoAsList().get(1).getEntries());
+ assertEquals(0, managedLedger.getLedgersInfoAsList().get(2).getEntries());
log.info("### ledgers {}", managedLedger.getLedgersInfo());
long firstLedger = managedLedger.getLedgersInfo().firstKey();
@@ -3153,10 +3142,12 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
// all the messages have benn acknowledged
// and all the ledgers have been removed except the last ledger
+ Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state");
+ stateUpdater.setAccessible(true);
+ stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened);
ledger.rollCurrentLedgerIfFull();
Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1));
- Awaitility.await().untilAsserted(() ->
- Assert.assertEquals(ledger.getState(), ManagedLedgerImpl.State.ClosedLedger));
+ Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0));
}
@Test
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
index 3356f0c5178..49b4742b71d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerEntryMetadataE2ETest.java
@@ -391,13 +391,10 @@ public class BrokerEntryMetadataE2ETest extends BrokerTestBase {
managedLedger.rollCurrentLedgerIfFull();
Awaitility.await().atMost(Duration.ofSeconds(3))
- .untilAsserted(() -> {
- Assert.assertEquals(managedLedger.getLedgersInfo().size(), 1);
- Assert.assertEquals(managedLedger.getState(), ManagedLedgerImpl.State.ClosedLedger);
- });
+ .until(() -> managedLedger.getLedgersInfo().size() > 1);
final List<LedgerInfo> ledgerInfoList = managedLedger.getLedgersInfoAsList();
- Assert.assertEquals(ledgerInfoList.size(), 1);
+ Assert.assertEquals(ledgerInfoList.size(), 2);
Assert.assertEquals(ledgerInfoList.get(0).getSize(), managedLedger.getTotalSize());
cursor.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
index f1d7a609a79..b05abf3be52 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java
@@ -20,14 +20,10 @@ package org.apache.pulsar.broker.service;
import java.lang.reflect.Field;
import java.time.Duration;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
-import io.netty.buffer.ByteBufAllocator;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
-import org.apache.bookkeeper.mledger.impl.OpAddEntry;
-import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
@@ -93,10 +89,8 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
consumer.acknowledge(msg);
}
- MLDataFormats.ManagedLedgerInfo.LedgerInfo lastLh =
- managedLedger.getLedgersInfoAsList().get(managedLedger.getLedgersInfoAsList().size() - 1);
// all the messages have been acknowledged
- // and all the ledgers have been removed except the last ledger
+ // and all the ledgers have been removed except the the last ledger
Awaitility.await()
.pollInterval(Duration.ofMillis(500L))
.untilAsserted(() -> {
@@ -110,41 +104,12 @@ public class CurrentLedgerRolloverIfFullTest extends BrokerTestBase {
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- // If there are no pending write messages, the last ledger will be closed and still held.
+ // the last ledger will be closed and removed and we have one ledger for empty
Awaitility.await()
.pollInterval(Duration.ofMillis(1000L))
.untilAsserted(() -> {
Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertEquals(lastLh.getLedgerId(),
- managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
- });
- producer.send(new byte[1024 * 1024]);
- Message<byte[]> msg = consumer.receive(2, TimeUnit.SECONDS);
- Assert.assertNotNull(msg);
- consumer.acknowledge(msg);
- // Assert that we got a new ledger and all but the current ledger are deleted
- Awaitility.await()
- .untilAsserted(()-> {
- Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertNotEquals(lastLh.getLedgerId(),
- managedLedger.getLedgersInfoAsList().get(0).getLedgerId());
- });
- long lastLhIdAfterRolloverAndSendAgain = managedLedger.getLedgersInfoAsList().get(0).getLedgerId();
-
- // Mock pendingAddEntries
- OpAddEntry op = OpAddEntry.
- createNoRetainBuffer(managedLedger, ByteBufAllocator.DEFAULT.buffer(128).retain(), null, null);
- Field pendingAddEntries = managedLedger.getClass().getDeclaredField("pendingAddEntries");
- pendingAddEntries.setAccessible(true);
- ConcurrentLinkedQueue<OpAddEntry> queue = (ConcurrentLinkedQueue<OpAddEntry>) pendingAddEntries.get(managedLedger);
- queue.add(op);
- // When ml has pending write messages, ml will create a new ledger and close and delete the previous ledger
- Awaitility.await()
- .untilAsserted(()-> {
- managedLedger.rollCurrentLedgerIfFull();
- Assert.assertEquals(managedLedger.getLedgersInfoAsList().size(), 1);
- Assert.assertNotEquals(managedLedger.getLedgersInfoAsList().get(0).getLedgerId(),
- lastLhIdAfterRolloverAndSendAgain);
+ Assert.assertEquals(managedLedger.getTotalSize(), 0);
});
}
}
diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
index 2aa678059d4..a06bf9e6dea 100644
--- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
+++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java
@@ -47,7 +47,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
-import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.ClosedLedger;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State.WriteFailed;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -169,9 +168,8 @@ public class MLTransactionMetadataStoreTest extends MockedBookKeeperTestCase {
stateUpdater.setAccessible(true);
stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened);
managedLedger.rollCurrentLedgerIfFull();
- Awaitility.await().untilAsserted(() -> {
- Assert.assertTrue(managedLedger.ledgerExists(position.getLedgerId()));
- Assert.assertEquals(managedLedger.getState(), ClosedLedger);
+ Awaitility.await().until(() -> {
+ return !managedLedger.ledgerExists(position.getLedgerId());
});
}
mlTransactionLog.closeAsync().get();