You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/10/11 02:31:50 UTC

[pulsar] branch master updated: [fix][broker]Fix mutex never released when trimming (#17911)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c9651ecb23b [fix][broker]Fix mutex never released when trimming (#17911)
c9651ecb23b is described below

commit c9651ecb23b07ec8a9a00a001da222b479fb78c3
Author: feynmanlin <31...@qq.com>
AuthorDate: Tue Oct 11 10:31:41 2022 +0800

    [fix][broker]Fix mutex never released when trimming (#17911)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 17 +++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 34 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index ad999472bbc..729cec2f31c 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2585,7 +2585,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 return;
             }
 
-            advanceCursorsIfNecessary(ledgersToDelete);
+            try {
+                advanceCursorsIfNecessary(ledgersToDelete);
+            } catch (LedgerNotExistException e) {
+                log.info("First non deleted Ledger is not found, stop trimming");
+                metadataMutex.unlock();
+                trimmerMutex.unlock();
+                return;
+            }
 
             PositionImpl currentLastConfirmedEntry = lastConfirmedEntry;
             // Update metadata
@@ -2658,7 +2665,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
      * This is to make sure that the `consumedEntries` counter is correctly updated with the number of skipped
      * entries and the stats are reported correctly.
      */
-    private void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) {
+    @VisibleForTesting
+    void advanceCursorsIfNecessary(List<LedgerInfo> ledgersToDelete) throws LedgerNotExistException {
         if (ledgersToDelete.isEmpty()) {
             return;
         }
@@ -2666,7 +2674,10 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         // need to move mark delete for non-durable cursors to the first ledger NOT marked for deletion
         // calling getNumberOfEntries latter for a ledger that is already deleted will be problematic and return
         // incorrect results
-        long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        Long firstNonDeletedLedger = ledgers.higherKey(ledgersToDelete.get(ledgersToDelete.size() - 1).getLedgerId());
+        if (firstNonDeletedLedger == null) {
+            throw new LedgerNotExistException("First non deleted Ledger is not found");
+        }
         PositionImpl highestPositionToDelete = new PositionImpl(firstNonDeletedLedger, -1);
 
         cursors.forEach(cursor -> {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index a07a84f70bd..0c3930f20e6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -22,9 +22,11 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.atLeast;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -3519,6 +3521,38 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testLockReleaseWhenTrimLedger() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(1);
+
+        ManagedLedgerImpl ledger = spy((ManagedLedgerImpl)factory.open("testLockReleaseWhenTrimLedger", config));
+        doThrow(new ManagedLedgerException.LedgerNotExistException("First non deleted Ledger is not found"))
+                .when(ledger).advanceCursorsIfNecessary(any());
+        final int entries = 10;
+        ManagedCursor cursor = ledger.openCursor("test-cursor" + UUID.randomUUID());
+        for (int i = 0; i < entries; i++) {
+            ledger.addEntry(String.valueOf(i).getBytes(Encoding));
+        }
+        List<Entry> entryList = cursor.readEntries(entries);
+        assertEquals(entryList.size(), entries);
+        assertEquals(ledger.ledgers.size(), entries);
+        assertEquals(ledger.ledgerCache.size(), entries - 1);
+        cursor.clearBacklog();
+        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        ledger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE);
+        // Cleanup fails because ManagedLedgerNotFoundException is thrown
+        Awaitility.await().untilAsserted(() -> {
+            assertEquals(ledger.ledgers.size(), entries);
+            assertEquals(ledger.ledgerCache.size(), entries - 1);
+        });
+        // The lock is released even if an ManagedLedgerNotFoundException occurs, so it can be called repeatedly
+        Awaitility.await().untilAsserted(() ->
+                verify(ledger, atLeast(2)).advanceCursorsIfNecessary(any()));
+        cursor.close();
+        ledger.close();
+    }
+
     @Test
     public void testInvalidateReadHandleWhenConsumed() throws Exception {
         ManagedLedgerConfig config = new ManagedLedgerConfig();