You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/11 02:31:50 UTC
[pulsar] branch master updated: [fix][broker]Fix mutex never released when trimming (#17911)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new c9651ecb23b [fix][broker]Fix mutex never released when trimming (#17911)
c9651ecb23b is described below
commit c9651ecb23b07ec8a9a00a001da222b479fb78c3
Author: feynmanlin <31...@qq.com>
AuthorDate: Tue Oct 11 10:31:41 2022 +0800
[fix][broker]Fix mutex never released when trimming (#17911)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++++++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ad999472bbc..729cec2f31c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2585,7 +2585,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return;
}
- advanceCursorsIfNecessary(ledgersToDelete);
+ try {
+ advanceCursorsIfNecessary(ledgersToDelete);
+ } catch (LedgerNotExistException e) {
+ log.info("First non deleted Ledger is not found, stop trimming");
+ metadataMutex.unlock();
+ trimmerMutex.unlock();
+ return;
+ }
PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
// Update metadata
@@ -2658,7 +2665,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
* This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
* entries and the stats are reported correctly.
*/
- private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
+ @VisibleForTesting
+ void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws LedgerNotExistException {
if (ledgersToDelete.isEmpty()) {
return;
}
@@ -2666,7 +2674,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
// calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
// incorrect results
- long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+ Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+ if (firstNonDeletedLedger == null) {
+ throw new LedgerNotExistException("First non deleted Ledger is not found");
+ }
PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
cursors.forEach(cursor -> {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a07a84f70bd..0c3930f20e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,9 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -3519,6 +3521,38 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testLockReleaseWhenTrimLedger() throws Exception {
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(1);
+
+ ManagedLedgerImpl ledger = spy((ManagedLedgerImpl)factory.open("testLockReleaseWhenTrimLedger", config));
+ doThrow(new ManagedLedgerException.LedgerNotExistException("First non deleted Ledger is not found"))
+ .when(ledger).advanceCursorsIfNecessary(any());
+ final int entries = 10;
+ ManagedCursor cursor = ledger.openCursor("test-cursor" + UUID.randomUUID());
+ for (int i = 0; i < entries; i++) {
+ ledger.addEntry(String.valueOf(i).getBytes(Encoding));
+ }
+ List<Entry> entryList = cursor.readEntries(entries);
+ assertEquals(entryList.size(), entries);
+ assertEquals(ledger.ledgers.size(), entries);
+ assertEquals(ledger.ledgerCache.size(), entries - 1);
+ cursor.clearBacklog();
+ ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+ // Cleanup fails because ManagedLedgerNotFoundException is thrown
+ Awaitility.await().untilAsserted(() -> {
+ assertEquals(ledger.ledgers.size(), entries);
+ assertEquals(ledger.ledgerCache.size(), entries - 1);
+ });
+ // The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
+ Awaitility.await().untilAsserted(() ->
+ verify(ledger, atLeast(2)).advanceCursorsIfNecessary(any()));
+ cursor.close();
+ ledger.close();
+ }
+
@Test
public void testInvalidateReadHandleWhenConsumed() throws Exception {
ManagedLedgerConfig config = new ManagedLedgerConfig();