You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2020/06/23 07:27:18 UTC
[pulsar] branch branch-2.6 updated: Fix producer stucks on creating
ledger timeout (#7319)
This is an automated email from the ASF dual-hosted git repository.
zhaijia pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.6 by this push:
new 848e53d Fix producer stucks on creating ledger timeout (#7319)
848e53d is described below
commit 848e53d1c29f18967cc7732ae2114fcd3cc563f8
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 22 13:32:28 2020 -0700
Fix producer stucks on creating ledger timeout (#7319)
* Fix producer stucks on creating ledger timeout
*Motivation*
The `ledgerCreated` flag is passed as ctx to the createLedger callback.
The callback already had the logic on handling `ledgerCreated` flag. But we set the flag to `false`
when timeout happens. It will trigger the following race condition:
a) The createComplete callback is triggered when timeout. But the pending add requests are not error'd out.
b) If the createComplete callback eventually completes, it will see `ledgerCreated` flag is set to true,
so it will cause `checkAndCompleteLedgerOpTask` returns false and exist too early without processing the
pending add requests.
This race condition only happens when creating ledger times out.
```
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}
if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}
```
*Modification*
The timeout logic shouldn't modify the `ledgerCreated` context. It should let the callback itself to process
the `ledgerCreated` context.
* Change to use CAS
(cherry picked from commit a34f6939f0aa639c37192ea0c9bc7b927a245664)
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++----
.../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++------
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 451ce30..5fd9b24 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3132,8 +3132,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
- ledgerCreated.set(true);
- cb.createComplete(BKException.Code.TimeoutException, null, null);
+ cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}
@@ -3149,14 +3148,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof AtomicBoolean) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
- if (((AtomicBoolean) (ctx)).get()) {
+ if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+ return false;
+ } else {
if (rc == BKException.Code.OK) {
log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
}
return true;
}
- ((AtomicBoolean) ctx).set(true);
}
return false;
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 06fd37d..b5b702f 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -2324,16 +2325,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicInteger response = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
- ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
- @Override
- public void createComplete(int rc, LedgerHandle lh, Object ctx) {
- response.set(rc);
- latch.countDown();
- }
+ AtomicReference<Object> ctxHolder = new AtomicReference<>();
+ ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+ response.set(rc);
+ latch.countDown();
+ ctxHolder.set(ctx);
}, Collections.emptyMap());
latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
+ assertTrue(ctxHolder.get() instanceof AtomicBoolean);
+ AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
+ assertFalse(ledgerCreated.get());
ledger.close();
}