You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 13:28:31 UTC

[pulsar] branch branch-2.8 updated: Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch branch-2.8
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.8 by this push:
     new 935f8ba  Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
935f8ba is described below

commit 935f8bacae6667c7f7fcd068b1bc6a3dbea244e0
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Nov 2 02:13:47 2021 +0900

    Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
    
    (cherry picked from commit a95a3824e0b1c207ea844fbce724734764f0be62)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
 2 files changed, 45 insertions(+), 8 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4cbb75f..02ea420 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1331,14 +1331,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
         factory.close(this);
         STATE_UPDATER.set(this, State.Closed);
-
-        if (this.timeoutTask != null) {
-            this.timeoutTask.cancel(false);
-        }
-
-        if (this.checkLedgerRollTask != null) {
-            this.checkLedgerRollTask.cancel(false);
-        }
+        cancelScheduledTasks();
 
         LedgerHandle lh = currentLedger;
 
@@ -2605,6 +2598,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         // Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
         // ledgers
         STATE_UPDATER.set(this, State.Fenced);
+        cancelScheduledTasks();
 
         List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
         if (cursors.isEmpty()) {
@@ -4010,4 +4004,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         }
     }
 
+    private void cancelScheduledTasks() {
+        if (this.timeoutTask != null) {
+            this.timeoutTask.cancel(false);
+        }
+
+        if (this.checkLedgerRollTask != null) {
+            this.checkLedgerRollTask.cancel(false);
+        }
+    }
+
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 46029eb..e378438 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -61,6 +61,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -3307,4 +3308,36 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         managedLedgerB.close();
 
     }
+
+    @Test
+    public void testCancellationOfScheduledTasks() throws Exception {
+        Field timeoutTaskField = ManagedLedgerImpl.class.getDeclaredField("timeoutTask");
+        timeoutTaskField.setAccessible(true);
+        Field checkLedgerRollTaskField = ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask");
+        checkLedgerRollTaskField.setAccessible(true);
+
+        ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger_1");
+        ledger1.addEntry("dummy-entry-1".getBytes(Encoding));
+        ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>) timeoutTaskField.get(ledger1);
+        assertNotNull(timeoutTask1);
+        assertFalse(timeoutTask1.isDone());
+        ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger1);
+        assertNotNull(checkLedgerRollTask1);
+        assertFalse(checkLedgerRollTask1.isDone());
+        ledger1.close();
+        assertTrue(timeoutTask1.isCancelled());
+        assertTrue(checkLedgerRollTask1.isCancelled());
+
+        ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory.open("my_test_ledger_2");
+        ledger2.addEntry("dummy-entry-2".getBytes(Encoding));
+        ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>) timeoutTaskField.get(ledger2);
+        assertNotNull(timeoutTask2);
+        assertFalse(timeoutTask2.isDone());
+        ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger2);
+        assertNotNull(checkLedgerRollTask2);
+        assertFalse(checkLedgerRollTask2.isDone());
+        ledger2.delete();
+        assertTrue(timeoutTask2.isCancelled());
+        assertTrue(checkLedgerRollTask2.isCancelled());
+    }
 }