You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/11 07:18:27 UTC
(pulsar) branch branch-3.0 updated: [fix] [ml] Fix orphan scheduled task for ledger create timeout check (#21542)
This is an automated email from the ASF dual-hosted git repository.
technoboy pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 610bedf264c [fix] [ml] Fix orphan scheduled task for ledger create timeout check (#21542)
610bedf264c is described below
commit 610bedf264c225f0ff544856ff48362791d61e0c
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Nov 11 13:42:27 2023 +0800
[fix] [ml] Fix orphan scheduled task for ledger create timeout check (#21542)
### Motivation
When an ML tries to create a new ledger, it will create a delay task to check if the ledger create request is timeout<sup>[1]</sup>.
However, we should cancel this delay task after the request to create new ledgers is finished. Otherwise, these tasks will cost unnecessary CPU resources.
### Modifications
Cancel the scheduled task after the create ledger request is finished
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 ++++++------
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 59 ++++++++++++++++++++--
2 files changed, 72 insertions(+), 18 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 1f0cd6c5f14..dbc06c3b2e0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,7 +59,6 @@ import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
@@ -4040,7 +4039,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
*/
protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
CreateCallback cb, Map<String, byte[]> metadata) {
- AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+ CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();
Map<String, byte[]> finalMetadata = new HashMap<>();
finalMetadata.putAll(ledgerMetadata);
finalMetadata.putAll(metadata);
@@ -4053,33 +4052,39 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
));
} catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
log.error("[{}] Serialize the placement configuration failed", name, e);
- cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+ cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
}
createdLedgerCustomMetadata = finalMetadata;
-
try {
bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
- config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+ config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
} catch (Throwable cause) {
log.error("[{}] Encountered unexpected error when creating ledger",
name, cause);
- cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+ ledgerFutureHook.completeExceptionally(cause);
+ cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
return;
}
- scheduledExecutor.schedule(() -> {
- if (!ledgerCreated.get()) {
+
+ ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+ if (!ledgerFutureHook.isDone()
+ && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
if (log.isDebugEnabled()) {
log.debug("[{}] Timeout creating ledger", name);
}
- cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+ cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger already created when timeout task is triggered", name);
}
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+ ledgerFutureHook.whenComplete((ignore, ex) -> {
+ timeoutChecker.cancel(false);
+ });
}
public Clock getClock() {
@@ -4088,16 +4093,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
/**
* check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
- *
- * @param rc
- * @param lh
- * @param ctx
* @return
*/
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
- if (ctx instanceof AtomicBoolean) {
+ if (ctx instanceof CompletableFuture) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
- if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+ if (((CompletableFuture) ctx).complete(lh)) {
return false;
} else {
if (rc == BKException.Code.OK) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index f9c52ec60b2..ca4e1d10a6c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -61,11 +61,13 @@ import java.util.NavigableMap;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -90,6 +92,8 @@ import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
import org.apache.bookkeeper.client.api.LedgerEntries;
import org.apache.bookkeeper.client.api.LedgerMetadata;
import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -136,6 +140,7 @@ import org.apache.pulsar.metadata.api.Stat;
import org.apache.pulsar.metadata.api.extended.SessionEvent;
import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
@@ -3086,9 +3091,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
- assertTrue(ctxHolder.get() instanceof AtomicBoolean);
- AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
- assertFalse(ledgerCreated.get());
+ assertTrue(ctxHolder.get() instanceof CompletableFuture);
+ CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get();
+ assertTrue(ledgerCreateHook.isCompletedExceptionally());
ledger.close();
}
@@ -4098,4 +4103,52 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
Position Position = new PositionImpl(-1L, -1L);
assertNotNull(ml.newNonDurableCursor(Position));
}
+
+ /***
+ * When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
+ * But we should guarantee that the delay task should be canceled after the ledger create request responded.
+ */
+ @Test
+ public void testNoOrphanScheduledTasksAfterCloseML() throws Exception {
+ String mlName = UUID.randomUUID().toString();
+ ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMetadataOperationsTimeoutSeconds(3600);
+
+ // Calculate pending task count.
+ long pendingTaskCountBefore = calculatePendingTaskCount(factory.getScheduledExecutor());
+ // Trigger create & close ML 1000 times.
+ for (int i = 0; i < 1000; i++) {
+ ManagedLedger ml = factory.open(mlName, config);
+ ml.close();
+ }
+ // Verify there is no orphan scheduled task.
+ long pendingTaskCountAfter = calculatePendingTaskCount(factory.getScheduledExecutor());
+ // Maybe there are other components also appended scheduled tasks, so leave 100 tasks to avoid flaky.
+ assertTrue(pendingTaskCountAfter - pendingTaskCountBefore < 100);
+ }
+
+ /**
+ * Calculate how many pending tasks in {@link OrderedScheduler}
+ */
+ private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
+ ExecutorService[] threads = WhiteboxImpl.getInternalState(orderedScheduler, "threads");
+ long taskCounter = 0;
+ for (ExecutorService thread : threads) {
+ BoundedScheduledExecutorService boundedScheduledExecutorService =
+ WhiteboxImpl.getInternalState(thread, "delegate");
+ BlockingQueue<Runnable> queue = WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue");
+ for (Runnable r : queue) {
+ if (r instanceof FutureTask) {
+ FutureTask futureTask = (FutureTask) r;
+ if (!futureTask.isCancelled() && !futureTask.isDone()) {
+ taskCounter++;
+ }
+ } else {
+ taskCounter++;
+ }
+ }
+ }
+ return taskCounter;
+ }
}