You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2023/11/11 07:23:37 UTC

(pulsar) 02/02: [fix] [ml] Fix orphan scheduled task for ledger create timeout check (#21542)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit cf78b711e4a3c8b6dbc13a3ccc0b8e272f6334a4
Author: fengyubiao <yu...@streamnative.io>
AuthorDate: Sat Nov 11 13:42:27 2023 +0800

    [fix] [ml] Fix orphan scheduled task for ledger create timeout check (#21542)
    
    ### Motivation
    
    When an ML tries to create a new ledger, it will create a delay task to check if the ledger create request is timeout<sup>[1]</sup>.
    
    However, we should cancel this delay task after the request to create new ledgers is finished. Otherwise, these tasks will cost unnecessary CPU resources.
    
    ### Modifications
    
    Cancel the scheduled task after the create ledger request is finished
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 31 ++++++------
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 59 ++++++++++++++++++++--
 2 files changed, 72 insertions(+), 18 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 90be4dbabee..9cec660b917 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -59,7 +59,6 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
@@ -3985,7 +3984,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
      */
     protected void asyncCreateLedger(BookKeeper bookKeeper, ManagedLedgerConfig config, DigestType digestType,
             CreateCallback cb, Map<String, byte[]> metadata) {
-        AtomicBoolean ledgerCreated = new AtomicBoolean(false);
+        CompletableFuture<LedgerHandle> ledgerFutureHook = new CompletableFuture<>();
         Map<String, byte[]> finalMetadata = new HashMap<>();
         finalMetadata.putAll(ledgerMetadata);
         finalMetadata.putAll(metadata);
@@ -3998,33 +3997,39 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 ));
             } catch (EnsemblePlacementPolicyConfig.ParseEnsemblePlacementPolicyConfigException e) {
                 log.error("[{}] Serialize the placement configuration failed", name, e);
-                cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+                cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
                 return;
             }
         }
         createdLedgerCustomMetadata = finalMetadata;
-
         try {
             bookKeeper.asyncCreateLedger(config.getEnsembleSize(), config.getWriteQuorumSize(),
-                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
+                    config.getAckQuorumSize(), digestType, config.getPassword(), cb, ledgerFutureHook, finalMetadata);
         } catch (Throwable cause) {
             log.error("[{}] Encountered unexpected error when creating ledger",
                 name, cause);
-            cb.createComplete(Code.UnexpectedConditionException, null, ledgerCreated);
+            ledgerFutureHook.completeExceptionally(cause);
+            cb.createComplete(Code.UnexpectedConditionException, null, ledgerFutureHook);
             return;
         }
-        scheduledExecutor.schedule(() -> {
-            if (!ledgerCreated.get()) {
+
+        ScheduledFuture timeoutChecker = scheduledExecutor.schedule(() -> {
+            if (!ledgerFutureHook.isDone()
+                    && ledgerFutureHook.completeExceptionally(new TimeoutException(name + " Create ledger timeout"))) {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Timeout creating ledger", name);
                 }
-                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerFutureHook);
             } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] Ledger already created when timeout task is triggered", name);
                 }
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
+
+        ledgerFutureHook.whenComplete((ignore, ex) -> {
+            timeoutChecker.cancel(false);
+        });
     }
 
     public Clock getClock() {
@@ -4033,16 +4038,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     /**
      * check if ledger-op task is already completed by timeout-task. If completed then delete the created ledger
-     *
-     * @param rc
-     * @param lh
-     * @param ctx
      * @return
      */
     protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
-        if (ctx instanceof AtomicBoolean) {
+        if (ctx instanceof CompletableFuture) {
             // ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
-            if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+            if (((CompletableFuture) ctx).complete(lh)) {
                 return false;
             } else {
                 if (rc == BKException.Code.OK) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index f9c52ec60b2..ca4e1d10a6c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -61,11 +61,13 @@ import java.util.NavigableMap;
 import java.util.Optional;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.FutureTask;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -90,6 +92,8 @@ import org.apache.bookkeeper.client.PulsarMockLedgerHandle;
 import org.apache.bookkeeper.client.api.LedgerEntries;
 import org.apache.bookkeeper.client.api.LedgerMetadata;
 import org.apache.bookkeeper.client.api.ReadHandle;
+import org.apache.bookkeeper.common.util.BoundedScheduledExecutorService;
+import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.conf.ClientConfiguration;
 import org.apache.bookkeeper.mledger.AsyncCallbacks;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
@@ -136,6 +140,7 @@ import org.apache.pulsar.metadata.api.Stat;
 import org.apache.pulsar.metadata.api.extended.SessionEvent;
 import org.apache.pulsar.metadata.impl.FaultInjectionMetadataStore;
 import org.awaitility.Awaitility;
+import org.awaitility.reflect.WhiteboxImpl;
 import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.DataProvider;
@@ -3086,9 +3091,9 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
 
         latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
         assertEquals(response.get(), BKException.Code.TimeoutException);
-        assertTrue(ctxHolder.get() instanceof AtomicBoolean);
-        AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
-        assertFalse(ledgerCreated.get());
+        assertTrue(ctxHolder.get() instanceof CompletableFuture);
+        CompletableFuture ledgerCreateHook = (CompletableFuture) ctxHolder.get();
+        assertTrue(ledgerCreateHook.isCompletedExceptionally());
 
         ledger.close();
     }
@@ -4098,4 +4103,52 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         Position Position = new PositionImpl(-1L, -1L);
         assertNotNull(ml.newNonDurableCursor(Position));
     }
+
+    /***
+     * When a ML tries to create a ledger, it will create a delay task to check if the ledger create request is timeout.
+     * But we should guarantee that the delay task should be canceled after the ledger create request responded.
+     */
+    @Test
+    public void testNoOrphanScheduledTasksAfterCloseML() throws Exception {
+        String mlName = UUID.randomUUID().toString();
+        ManagedLedgerFactoryImpl factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+        ManagedLedgerConfig config = new ManagedLedgerConfig();
+        config.setMetadataOperationsTimeoutSeconds(3600);
+
+        // Calculate pending task count.
+        long pendingTaskCountBefore = calculatePendingTaskCount(factory.getScheduledExecutor());
+        // Trigger create & close ML 1000 times.
+        for (int i = 0; i < 1000; i++) {
+            ManagedLedger ml = factory.open(mlName, config);
+            ml.close();
+        }
+        // Verify there is no orphan scheduled task.
+        long pendingTaskCountAfter = calculatePendingTaskCount(factory.getScheduledExecutor());
+        // Maybe there are other components also appended scheduled tasks, so leave 100 tasks to avoid flaky.
+        assertTrue(pendingTaskCountAfter - pendingTaskCountBefore < 100);
+    }
+
+    /**
+     * Calculate how many pending tasks in {@link OrderedScheduler}
+     */
+    private long calculatePendingTaskCount(OrderedScheduler orderedScheduler) {
+        ExecutorService[] threads = WhiteboxImpl.getInternalState(orderedScheduler, "threads");
+        long taskCounter = 0;
+        for (ExecutorService thread : threads) {
+            BoundedScheduledExecutorService boundedScheduledExecutorService =
+                    WhiteboxImpl.getInternalState(thread, "delegate");
+            BlockingQueue<Runnable> queue =  WhiteboxImpl.getInternalState(boundedScheduledExecutorService, "queue");
+            for (Runnable r : queue) {
+                if (r instanceof FutureTask) {
+                    FutureTask futureTask = (FutureTask) r;
+                    if (!futureTask.isCancelled() && !futureTask.isDone()) {
+                        taskCounter++;
+                    }
+                } else {
+                    taskCounter++;
+                }
+            }
+        }
+        return taskCounter;
+    }
 }