You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2020/06/22 20:32:39 UTC
[pulsar] branch master updated: Fix producer stucks on creating
ledger timeout (#7319)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new a34f693 Fix producer stucks on creating ledger timeout (#7319)
a34f693 is described below
commit a34f6939f0aa639c37192ea0c9bc7b927a245664
Author: Sijie Guo <si...@apache.org>
AuthorDate: Mon Jun 22 13:32:28 2020 -0700
Fix producer stucks on creating ledger timeout (#7319)
* Fix producer stucks on creating ledger timeout
*Motivation*
The `ledgerCreated` flag is passed as ctx to the createLedger callback.
The callback already had the logic on handling `ledgerCreated` flag. But we set the flag to `false`
when timeout happens. It will trigger the following race condition:
a) The createComplete callback is triggered when timeout. But the pending add requests are not error'd out.
b) If the createComplete callback eventually completes, it will see `ledgerCreated` flag is set to true,
so it will cause `checkAndCompleteLedgerOpTask` returns false and exist too early without processing the
pending add requests.
This race condition only happens when creating ledger times out.
```
public synchronized void createComplete(int rc, final LedgerHandle lh, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
}
if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
return;
}
```
*Modification*
The timeout logic shouldn't modify the `ledgerCreated` context. It should let the callback itself to process
the `ledgerCreated` context.
* Change to use CAS
---
.../apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++++----
.../apache/bookkeeper/mledger/impl/ManagedLedgerTest.java | 15 +++++++++------
2 files changed, 13 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 48b864d..27d8848 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3177,8 +3177,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
digestType, config.getPassword(), cb, ledgerCreated, finalMetadata);
scheduledExecutor.schedule(() -> {
if (!ledgerCreated.get()) {
- ledgerCreated.set(true);
- cb.createComplete(BKException.Code.TimeoutException, null, null);
+ cb.createComplete(BKException.Code.TimeoutException, null, ledgerCreated);
}
}, config.getMetadataOperationsTimeoutSeconds(), TimeUnit.SECONDS);
}
@@ -3194,14 +3193,15 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object ctx) {
if (ctx instanceof AtomicBoolean) {
// ledger-creation is already timed out and callback is already completed so, delete this ledger and return.
- if (((AtomicBoolean) (ctx)).get()) {
+ if (((AtomicBoolean) (ctx)).compareAndSet(false, true)) {
+ return false;
+ } else {
if (rc == BKException.Code.OK) {
log.warn("[{}]-{} ledger creation timed-out, deleting ledger", this.name, lh.getId());
asyncDeleteLedger(lh.getId(), DEFAULT_LEDGER_DELETE_RETRIES);
}
return true;
}
- ((AtomicBoolean) ctx).set(true);
}
return false;
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 427a773..e507c99 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -95,6 +95,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryConfig;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.VoidCallback;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.State;
import org.apache.bookkeeper.mledger.impl.MetaStore.MetaStoreCallback;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.mledger.proto.MLDataFormats.ManagedLedgerInfo;
@@ -2326,16 +2327,18 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
AtomicInteger response = new AtomicInteger(0);
CountDownLatch latch = new CountDownLatch(1);
- ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
- @Override
- public void createComplete(int rc, LedgerHandle lh, Object ctx) {
- response.set(rc);
- latch.countDown();
- }
+ AtomicReference<Object> ctxHolder = new AtomicReference<>();
+ ledger.asyncCreateLedger(bk, config, null, (rc, lh, ctx) -> {
+ response.set(rc);
+ latch.countDown();
+ ctxHolder.set(ctx);
}, Collections.emptyMap());
latch.await(config.getMetadataOperationsTimeoutSeconds() + 2, TimeUnit.SECONDS);
assertEquals(response.get(), BKException.Code.TimeoutException);
+ assertTrue(ctxHolder.get() instanceof AtomicBoolean);
+ AtomicBoolean ledgerCreated = (AtomicBoolean) ctxHolder.get();
+ assertFalse(ledgerCreated.get());
ledger.close();
}