You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ji...@apache.org on 2022/07/27 09:55:38 UTC
[pulsar] branch branch-2.7 updated: cancel offload tasks when managed ledger closed (#16808)
This is an automated email from the ASF dual-hosted git repository.
jianghaiting pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new b81b180b021 cancel offload tasks when managed ledger closed (#16808)
b81b180b021 is described below
commit b81b180b021e159591e97eadfd5c7b0a9207ff05
Author: JiangHaiting <ji...@apache.org>
AuthorDate: Wed Jul 27 17:55:31 2022 +0800
cancel offload tasks when managed ledger closed (#16808)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 9 +++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 42 +++++++++++++++++++++-
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b66b9a8aa8d..babe6320ae0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2065,13 +2065,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
+ offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
+ unlockingPromise.complete(PositionImpl.latest);
}
-
- offloadLoop(unlockingPromise, toOffload, PositionImpl.latest, Optional.empty());
}
}
}
@@ -2605,6 +2605,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+ if (getState().equals(State.Closed.toString())) {
+ promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+ String.format("managed ledger [%s] has already closed", name)));
+ return;
+ }
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 2f001a26203..d6fb1bee585 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -50,6 +50,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -104,12 +105,12 @@ import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo.LedgerInfo;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
-import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.policies.data.EnsemblePlacementPolicyConfig;
+import org.apache.pulsar.common.policies.data.OffloadPolicies;
import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.impl.zookeeper.ZKMetadataStore;
import org.apache.zookeeper.CreateMode;
@@ -118,6 +119,7 @@ import org.apache.zookeeper.MockZooKeeper;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -3027,4 +3029,42 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Awaitility.await()
.until(() -> ledgerId2 == lh.getId());
}
+
+ @Test
+ public void testOffloadTaskCancelled() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(bkc, zkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+ OffloadPolicies offloadPolicies = new OffloadPolicies();
+ offloadPolicies.setManagedLedgerOffloadDriver("mock");
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+ LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+ Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+ Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+ config.setLedgerOffloader(ledgerOffloader);
+
+ CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+ readHandle.complete(mock(ReadHandle.class));
+
+ CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+ offloadFuture.complete(null);
+ Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+ final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+ final ManagedLedgerImpl ledger = spy(ledgerInit);
+ long ledgerId = 3L;
+ doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+ doReturn(ManagedLedgerImpl.State.Closed.toString()).when(ledger).getState();
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ ledger.close();
+
+ Awaitility.await().untilAsserted(() -> {
+ LedgerInfo ledgerInfo = ledger.getLedgersInfo().get(ledgerId);
+ Assert.assertFalse(ledgerInfo.getOffloadContext().getComplete());
+ });
+ }
}