You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:41:07 UTC
[pulsar] 14/14: Cancel offload tasks when managed ledger closed. (#14545)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c8a0dff7e1b51630840a301a35b83c62d188f983
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Mar 11 10:06:33 2022 +0800
Cancel offload tasks when managed ledger closed. (#14545)
### Motivation
When the user config the offloader, as the ledger close, it will trigger the ledger to offload. If there are many ledgers that need to offload, but the topic has been unloaded, the offloader will continue to offload. Because the offloader uses the shared executor pool in ManagedLedgerFactoryImpl and when the managed ledger closes, it doesn't cancel the tasks.
```
15:29:59.180 [pulsar-web-41-3] INFO org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Unloading topic persistent://public/default/UpdateNodeCharts
15:29:59.201 [pulsar-web-41-3] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Closing managed ledger
15:29:59.216 [main-EventThread] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/UpdateNodeCharts] [cloud-nodes-service] Updating cursor info ledgerId=-1 mark-delete=789182:82011
15:29:59.219 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/UpdateNodeCharts][cloud-nodes-service] Closed cursor at md-position=789182:82011
15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/UpdateNodeCharts] Topic closed
15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Successfully unloaded topic persistent://public/default/UpdateNodeCharts
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Preparing metadata to offload ledger 422142 with uuid 030267e2-a2f9-40a3-848b-482f9b007c00
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Found previous offload attempt for ledger 422142, uuid 030267e2-a2f9-40a3-848b-482f9b007c00, cleaning up
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Cleanup offload for ledgerId 422142 uuid 3725b3c1-1dbc-481f-a1dd-8aaffb75e603 because of the reason Previous failed offload.
```
### Modifications
- When do `offloadLoop`, check state first. if `Close`, nothing to do.
(cherry picked from commit e0687e37e137f55c6cffa263d8ac8af9169dad92)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 ++++++++++++++++++++++
2 files changed, 47 insertions(+), 2 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a13cf68..5334544 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2360,13 +2360,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
+ offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
+ unlockingPromise.complete(PositionImpl.LATEST);
}
-
- offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
}
}
}
@@ -2929,6 +2929,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+ if (getState() == State.Closed) {
+ promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+ String.format("managed ledger [%s] has already closed", name)));
+ return;
+ }
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 982b914..c6008c76 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -54,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -121,6 +122,7 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -3449,4 +3451,42 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
factory.shutdown();
}
+ @Test
+ public void testOffloadTaskCancelled() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+ OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+ offloadPolicies.setManagedLedgerOffloadDriver("mock");
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+ LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+ Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+ Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+ config.setLedgerOffloader(ledgerOffloader);
+
+ CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+ readHandle.complete(mock(ReadHandle.class));
+
+ CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+ offloadFuture.complete(null);
+ Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+ final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+ final ManagedLedgerImpl ledger = spy(ledgerInit);
+ long ledgerId = 3L;
+ doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+ doReturn(ManagedLedgerImpl.State.Closed).when(ledger).getState();
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ ledger.close();
+
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<LedgerInfo> ledgerInfo = ledger.getLedgerInfo(ledgerId);
+ Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
+ });
+ }
+
}