You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/12/09 13:33:27 UTC
[pulsar] branch branch-2.7 updated: Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 76330a1 Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
76330a1 is described below
commit 76330a1dfe5274312411adf8f330ace810cf1a2b
Author: Masahiro Sakamoto <ma...@yahoo-corp.jp>
AuthorDate: Tue Nov 2 02:13:47 2021 +0900
Cancel scheduled tasks when deleting ManagedLedgerImpl (#12565)
(cherry picked from commit a95a3824e0b1c207ea844fbce724734764f0be62)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 20 +++++++------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 33 ++++++++++++++++++++++
2 files changed, 45 insertions(+), 8 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 44e9d67..5a5ce1e 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1209,14 +1209,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
factory.close(this);
STATE_UPDATER.set(this, State.Closed);
-
- if (this.timeoutTask != null) {
- this.timeoutTask.cancel(false);
- }
-
- if (this.checkLedgerRollTask != null) {
- this.checkLedgerRollTask.cancel(false);
- }
+ cancelScheduledTasks();
LedgerHandle lh = currentLedger;
@@ -2341,6 +2334,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// Delete the managed ledger without closing, since we are not interested in gracefully closing cursors and
// ledgers
STATE_UPDATER.set(this, State.Fenced);
+ cancelScheduledTasks();
List<ManagedCursor> cursors = Lists.newArrayList(this.cursors);
if (cursors.isEmpty()) {
@@ -3603,4 +3597,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
+ private void cancelScheduledTasks() {
+ if (this.timeoutTask != null) {
+ this.timeoutTask.cancel(false);
+ }
+
+ if (this.checkLedgerRollTask != null) {
+ this.checkLedgerRollTask.cancel(false);
+ }
+ }
+
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 56f6ca3..9bcf7cc 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -3017,4 +3018,36 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
managedLedgerB.close();
}
+
+ @Test
+ public void testCancellationOfScheduledTasks() throws Exception {
+ Field timeoutTaskField = ManagedLedgerImpl.class.getDeclaredField("timeoutTask");
+ timeoutTaskField.setAccessible(true);
+ Field checkLedgerRollTaskField = ManagedLedgerImpl.class.getDeclaredField("checkLedgerRollTask");
+ checkLedgerRollTaskField.setAccessible(true);
+
+ ManagedLedgerImpl ledger1 = (ManagedLedgerImpl) factory.open("my_test_ledger_1");
+ ledger1.addEntry("dummy-entry-1".getBytes(Encoding));
+ ScheduledFuture<?> timeoutTask1 = (ScheduledFuture<?>) timeoutTaskField.get(ledger1);
+ assertNotNull(timeoutTask1);
+ assertFalse(timeoutTask1.isDone());
+ ScheduledFuture<?> checkLedgerRollTask1 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger1);
+ assertNotNull(checkLedgerRollTask1);
+ assertFalse(checkLedgerRollTask1.isDone());
+ ledger1.close();
+ assertTrue(timeoutTask1.isCancelled());
+ assertTrue(checkLedgerRollTask1.isCancelled());
+
+ ManagedLedgerImpl ledger2 = (ManagedLedgerImpl) factory.open("my_test_ledger_2");
+ ledger2.addEntry("dummy-entry-2".getBytes(Encoding));
+ ScheduledFuture<?> timeoutTask2 = (ScheduledFuture<?>) timeoutTaskField.get(ledger2);
+ assertNotNull(timeoutTask2);
+ assertFalse(timeoutTask2.isDone());
+ ScheduledFuture<?> checkLedgerRollTask2 = (ScheduledFuture<?>) checkLedgerRollTaskField.get(ledger2);
+ assertNotNull(checkLedgerRollTask2);
+ assertFalse(checkLedgerRollTask2.isDone());
+ ledger2.delete();
+ assertTrue(timeoutTask2.isCancelled());
+ assertTrue(checkLedgerRollTask2.isCancelled());
+ }
}