You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/14 17:44:56 UTC
[pulsar] branch master updated: [pulsar-ml] handle race condition
between timeout-task and add-call complete (#4455)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d72b986 [pulsar-ml] handle race condition between timeout-task and add-call complete (#4455)
d72b986 is described below
commit d72b986d311d8b528619f4a4dcddf757b3975dde
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Jun 14 10:44:51 2019 -0700
[pulsar-ml] handle race condition between timeout-task and add-call complete (#4455)
* [pulsar-ml] handle race condition between timeout-task and add-call complete
* rename variable with simple name
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 8 ++--
.../apache/bookkeeper/mledger/impl/OpAddEntry.java | 54 +++++++++++-----------
2 files changed, 32 insertions(+), 30 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fe49985..3bfe7ab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -227,6 +227,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
+ protected static final AtomicLongFieldUpdater<ManagedLedgerImpl> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(ManagedLedgerImpl.class, "addOpCount");
+ private volatile long addOpCount = 0;
private final long backloggedCursorThresholdEntries;
@@ -3086,12 +3089,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
OpAddEntry opAddEntry = pendingAddEntries.peek();
if (opAddEntry != null) {
boolean isTimedOut = opAddEntry.lastInitTime != -1
- && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
- && opAddEntry.completed == FALSE;
+ && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
if (isTimedOut) {
log.error("Failed to add entry for ledger {} in time-out {} sec",
(opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
- opAddEntry.handleAddFailure(opAddEntry.ledger);
+ opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, opAddEntry.addOpCount);
}
}
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c86f7b9..e741d44 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,9 +24,8 @@ import io.netty.buffer.ByteBuf;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
-import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -38,8 +37,6 @@ import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.bookkeeper.util.SafeRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
/**
* Handles the life-cycle of an addEntry() operation.
@@ -52,14 +49,14 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
@SuppressWarnings("unused")
private volatile AddEntryCallback callback;
- private Object ctx;
+ Object ctx;
+ volatile long addOpCount;
+ private static final AtomicLongFieldUpdater<OpAddEntry> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(OpAddEntry.class, "addOpCount");
private boolean closeWhenDone;
private long startTime;
volatile long lastInitTime;
- private static final AtomicIntegerFieldUpdater<OpAddEntry> COMPLETED_UPDATER =
- AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed");
@SuppressWarnings("unused")
- volatile int completed = FALSE;
ByteBuf data;
private int dataLength;
@@ -74,10 +71,10 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
op.dataLength = data.readableBytes();
op.callback = callback;
op.ctx = ctx;
+ op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
op.closeWhenDone = false;
op.entryId = -1;
op.startTime = System.nanoTime();
- op.completed = FALSE;
ml.mbean.addAddEntrySample(op.dataLength);
if (log.isDebugEnabled()) {
log.debug("Created new OpAddEntry {}", op);
@@ -97,14 +94,12 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
ByteBuf duplicateBuffer = data.retainedDuplicate();
// internally asyncAddEntry() will take the ownership of the buffer and release it at the end
+ addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);;
lastInitTime = System.nanoTime();
- ledger.asyncAddEntry(duplicateBuffer, this, ctx);
+ ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
}
public void failed(ManagedLedgerException e) {
- if (!checkAndCompleteTimeoutTask()) {
- return;
- }
AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
data.release();
if (cb != null) {
@@ -120,7 +115,11 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
}
checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
lh.getId());
- checkArgument(this.ctx == ctx);
+
+ if (!checkAndCompleteOp(ctx)) {
+ // means callback might have been completed by different thread (timeout task thread).. so do nothing
+ return;
+ }
this.entryId = entryId;
if (log.isDebugEnabled()) {
@@ -131,9 +130,6 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
if (rc != BKException.Code.OK) {
handleAddFailure(lh);
} else {
- if(!checkAndCompleteTimeoutTask()) {
- return;
- }
// Trigger addComplete callback in a thread hashed on the managed ledger name
ml.getExecutor().executeOrdered(ml.getName(), this);
}
@@ -205,19 +201,23 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
}
/**
- * It cancels timeout task and checks if add-entry operation is not completed yet.
+ * Checks if add-operation is completed
*
* @return true if task is not already completed else returns false.
*/
- private boolean checkAndCompleteTimeoutTask() {
- if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) {
- if (log.isDebugEnabled()) {
- log.debug("Add-entry already completed for {}-{}", this.ledger != null ? this.ledger.getId() : -1,
- this.entryId);
- }
- return false;
+ private boolean checkAndCompleteOp(Object ctx) {
+ long addOpCount = (ctx != null && ctx instanceof Long) ? (long) ctx : -1;
+ if (addOpCount != -1 && ADD_OP_COUNT_UPDATER.compareAndSet(this, this.addOpCount, -1)) {
+ return true;
+ }
+ log.info("Add-entry already completed for {}-{}", ledger != null ? ledger.getId() : -1, entryId);
+ return false;
+ }
+
+ void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
+ if (checkAndCompleteOp(ctx)) {
+ this.handleAddFailure(ledger);
}
- return true;
}
/**
@@ -257,8 +257,8 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
dataLength = -1;
callback = null;
ctx = null;
+ addOpCount = -1;
closeWhenDone = false;
- completed = FALSE;
entryId = -1;
startTime = -1;
lastInitTime = -1;