You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/27 09:55:38 UTC

[pulsar] branch branch-2.7 updated: cancel offload tasks when managed ledger closed (#16808)

This is an automated email from the ASF dual-hosted git repository.

jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new b81b180b021 cancel offload tasks when managed ledger closed (#16808)
b81b180b021 is described below

commit b81b180b021e159591e97eadfd5c7b0a9207ff05
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Jul 27 17:55:31 2022 +0800

    cancel offload tasks when managed ledger closed (#16808)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  9 +++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 42 +++++++++++++++++++++-
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b66b9a8aa8d..babe6320ae0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2065,13 +2065,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                                     + ", total size = {}, already offloaded = {}, to offload = {}",
                             name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
                             sizeSummed, alreadyOffloadedSize, toOffloadSize);
+                    offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
                 } else {
                     // offloadLoop will complete immediately with an empty list to offload
                     log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
                             name, sizeSummed, alreadyOffloadedSize, threshold);
+                    unlockingPromise.complete(PositionImpl.latest);
                 }
-
-                offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
             }
         }
     }
@@ -2605,6 +2605,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+        if (getState().equals(State.Closed.toString())) {
+            promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+                    String.format("managed ledger [%s] has already closed", name)));
+            return;
+        }
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 2f001a26203..d6fb1bee585 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -50,6 +50,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
@@ -104,12 +105,12 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
 import org.apache.bookkeeper.mledger.util.Futures;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.commons.lang3.mutable.MutableObject;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
 import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
 import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
 import org.apache.zookeeper.CreateMode;
@@ -118,6 +119,7 @@ import org.apache.zookeeper.MockZooKeeper;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.ZooKeeper;
 import org.awaitility.Awaitility;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -3027,4 +3029,42 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         Awaitility.await()
                 .until(() -> ledgerId2 == lh.getId());
     }
+
+    @Test
+    public void testOffloadTaskCancelled() throws Exception {
+        ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMaxEntriesPerLedger(2);
+        config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+        OffloadPolicies offloadPolicies = new OffloadPolicies();
+        offloadPolicies.setManagedLedgerOffloadDriver("mock");
+        offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+        LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+        Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+        Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+        config.setLedgerOffloader(ledgerOffloader);
+
+        CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+        readHandle.complete(mock(ReadHandle.class));
+
+        CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+        offloadFuture.complete(null);
+        Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+        final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+        final ManagedLedgerImpl ledger = spy(ledgerInit);
+        long ledgerId = 3L;
+        doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+        doReturn(ManagedLedgerImpl.State.Closed.toString()).when(ledger).getState();
+        ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+        ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+        ledger.close();
+
+        Awaitility.await().untilAsserted(() -> {
+            LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+            Assert.assertFalse(ledgerInfo.getOffloadContext().getComplete());
+        });
+    }
 }