You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/03/12 02:41:07 UTC

[pulsar] 14/14: Cancel offload tasks when managed ledger closed. (#14545)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c8a0dff7e1b51630840a301a35b83c62d188f983
Author: Jiwei Guo <te...@apache.org>
AuthorDate: Fri Mar 11 10:06:33 2022 +0800

    Cancel offload tasks when managed ledger closed. (#14545)
    
    ### Motivation
    When the user config the offloader, as the ledger close, it will trigger the ledger to offload. If there are many ledgers that need to offload, but the topic has been unloaded, the offloader will continue to offload. Because the offloader uses the shared executor pool in ManagedLedgerFactoryImpl and when the managed ledger closes, it doesn't cancel the tasks.
    
    ```
    15:29:59.180 [pulsar-web-41-3] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Unloading topic persistent://public/default/UpdateNodeCharts
    15:29:59.201 [pulsar-web-41-3] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Closing managed ledger
    15:29:59.216 [main-EventThread] INFO  org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/UpdateNodeCharts] [cloud-nodes-service] Updating cursor info ledgerId=-1 mark-delete=789182:82011
    15:29:59.219 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/UpdateNodeCharts][cloud-nodes-service] Closed cursor at md-position=789182:82011
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/UpdateNodeCharts] Topic closed
    15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Successfully unloaded topic persistent://public/default/UpdateNodeCharts
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Preparing metadata to offload ledger 422142 with uuid 030267e2-a2f9-40a3-848b-482f9b007c00
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Found previous offload attempt for ledger 422142, uuid 030267e2-a2f9-40a3-848b-482f9b007c00, cleaning up
    15:31:05.432 [offloader-OrderedScheduler-1-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Cleanup offload for ledgerId 422142 uuid 3725b3c1-1dbc-481f-a1dd-8aaffb75e603 because of the reason Previous failed offload.
    ```
    
    ### Modifications
    
    - When do `offloadLoop`, check state first. if `Close`, nothing to do.
    
    (cherry picked from commit e0687e37e137f55c6cffa263d8ac8af9169dad92)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 40 ++++++++++++++++++++++
 2 files changed, 47 insertions(+), 2 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a13cf68..5334544 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2360,13 +2360,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                     + ", total size = {}, already offloaded = {}, to offload = {}",
                             name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
                             sizeSummed, alreadyOffloadedSize, toOffloadSize);
+                    offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
                 } else {
                     // offloadLoop will complete immediately with an empty list to offload
                     log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
                             name, sizeSummed, alreadyOffloadedSize, threshold);
+                    unlockingPromise.complete(PositionImpl.LATEST);
                 }
-
-                offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
             }
         }
     }
@@ -2929,6 +2929,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+        if (getState() == State.Closed) {
+            promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+                    String.format("managed ledger [%s] has already closed", name)));
+            return;
+        }
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 982b914..c6008c76 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -54,6 +54,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -121,6 +122,7 @@ import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
 import org.apache.pulsar.metadata.api.MetadataStoreException;
 import org.apache.pulsar.metadata.api.Stat;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
@@ -3449,4 +3451,42 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         factory.shutdown();
     }
 
+    @Test
+    public void testOffloadTaskCancelled() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+        OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+        config.setLedgerOffloader(ledgerOffloader);
+
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+
+        CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+        offloadFuture.complete(null);
+        Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+        final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+        final ManagedLedgerImpl ledger = spy(ledgerInit);
+        long ledgerId = 3L;
+        doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+        doReturn(ManagedLedgerImpl.State.Closed).when(ledger).getState();
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        ledger.close();
+
+        Awaitility.await().untilAsserted(() -> {
+            CompletableFuture<LedgerInfo> ledgerInfo = ledger.getLedgerInfo(ledgerId);
+            Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
+        });
+    }
+
 }