You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 20:32:39 UTC

[pulsar] branch master updated: Fix producer stucks on creating ledger timeout (#7319)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new a34f693  Fix producer stucks on creating ledger timeout (#7319)
a34f693 is described below

commit a34f6939f0aa639c37192ea0c9bc7b927a245664
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 22 13:32:28 2020 -0700

    Fix producer stucks on creating ledger timeout (#7319)
    
    * Fix producer stucks on creating ledger timeout
    
    *Motivation*
    
    The `ledgerCreated` flag is passed as ctx to the createLedger callback.
    The callback already had the logic on handling `ledgerCreated` flag. But we set the flag to `false`
    when timeout happens. It will trigger the following race condition:
    
    a) The createComplete callback is triggered when timeout. But the pending add requests are not error'd out.
    b) If the createComplete callback eventually completes, it will see `ledgerCreated` flag is set to true,
    so it will cause `checkAndCompleteLedgerOpTask` returns false and exist too early without processing the
    pending add requests.
    
    This race condition only happens when creating ledger times out.
    
    ```
        public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
            }
    
            if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
                return;
            }
    ```
    
    *Modification*
    
    The timeout logic shouldn't modify the `ledgerCreated` context. It should let the callback itself to process
    the `ledgerCreated` context.
    
    * Change to use CAS
---
 .../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 ++++----
 .../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++------
 2 files changed, 13 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 48b864d..27d8848 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3177,8 +3177,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
         scheduledExecutor.schedule(() -> {
             if (!ledgerCreated.get()) {
-                ledgerCreated.set(true);
-                cb.createComplete(BKException.Code.TimeoutException, null, null);
+                cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
             }
         }, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
     }
@@ -3194,14 +3193,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
         if (ctx instanceof AtomicBoolean) {
             // ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
-            if (((AtomicBoolean) (ctx)).get()) {
+            if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+                return false;
+            } else {
                 if (rc == BKException.Code.OK) {
                     log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
                     asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
                 }
                 return true;
             }
-            ((AtomicBoolean) ctx).set(true);
         }
         return false;
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 427a773..e507c99 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
 import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats;
 import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -2326,16 +2327,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
         AtomicInteger response = new AtomicInteger(0);
         CountDownLatch latch = new CountDownLatch(1);
-        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
-            @Override
-            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
-                response.set(rc);
-                latch.countDown();
-            }
+        AtomicReference<Object> ctxHolder = new AtomicReference<>();
+        ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+            response.set(rc);
+            latch.countDown();
+            ctxHolder.set(ctx);
         }, Collections.emptyMap());
 
         latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
         assertEquals(response.get(), BKException.Code.TimeoutException);
+        assertTrue(ctxHolder.get() instanceof AtomicBoolean);
+        AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
+        assertFalse(ledgerCreated.get());
 
         ledger.close();
     }