You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/06/14 17:44:56 UTC

[pulsar] branch master updated: [pulsar-ml] handle race condition between timeout-task and add-call complete (#4455)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d72b986  [pulsar-ml] handle race condition between timeout-task and add-call complete (#4455)
d72b986 is described below

commit d72b986d311d8b528619f4a4dcddf757b3975dde
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Jun 14 10:44:51 2019 -0700

    [pulsar-ml] handle race condition between timeout-task and add-call complete (#4455)
    
    * [pulsar-ml] handle race condition between timeout-task and add-call complete
    
    * rename variable with simple name
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java |  8 ++--
 .../apache/bookkeeper/mledger/impl/OpAddEntry.java | 54 +++++++++++-----------
 2 files changed, 32 insertions(+), 30 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index fe49985..3bfe7ab 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -227,6 +227,9 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
             .newUpdater(ManagedLedgerImpl.class, "readOpCount");
     private volatile long readOpCount = 0;
+    protected static final AtomicLongFieldUpdater<ManagedLedgerImpl> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, "addOpCount");
+    private volatile long addOpCount = 0;
 
     private final long backloggedCursorThresholdEntries;
 
@@ -3086,12 +3089,11 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         OpAddEntry opAddEntry = pendingAddEntries.peek();
         if (opAddEntry != null) {
             boolean isTimedOut = opAddEntry.lastInitTime != -1
-                    && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec
-                    && opAddEntry.completed == FALSE;
+                    && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - opAddEntry.lastInitTime) >= timeoutSec;
             if (isTimedOut) {
                 log.error("Failed to add entry for ledger {} in time-out {} sec",
                         (opAddEntry.ledger != null ? opAddEntry.ledger.getId() : -1), timeoutSec);
-                opAddEntry.handleAddFailure(opAddEntry.ledger);
+                opAddEntry.handleAddTimeoutFailure(opAddEntry.ledger, opAddEntry.addOpCount);
             }
         }
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
index c86f7b9..e741d44 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java
@@ -24,9 +24,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 
-import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.client.AsyncCallback.AddCallback;
 import org.apache.bookkeeper.client.AsyncCallback.CloseCallback;
@@ -38,8 +37,6 @@ import org.apache.bookkeeper.mledger.util.SafeRun;
 import org.apache.bookkeeper.util.SafeRunnable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 /**
  * Handles the life-cycle of an addEntry() operation.
@@ -52,14 +49,14 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
 
     @SuppressWarnings("unused")
     private volatile AddEntryCallback callback;
-    private Object ctx;
+    Object ctx;
+    volatile long addOpCount;
+    private static final AtomicLongFieldUpdater<OpAddEntry> ADD_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(OpAddEntry.class, "addOpCount");
     private boolean closeWhenDone;
     private long startTime;
     volatile long lastInitTime;
-    private static final AtomicIntegerFieldUpdater<OpAddEntry> COMPLETED_UPDATER =
-        AtomicIntegerFieldUpdater.newUpdater(OpAddEntry.class, "completed");
     @SuppressWarnings("unused")
-    volatile int completed = FALSE;
     ByteBuf data;
     private int dataLength;
 
@@ -74,10 +71,10 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
         op.dataLength = data.readableBytes();
         op.callback = callback;
         op.ctx = ctx;
+        op.addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);
         op.closeWhenDone = false;
         op.entryId = -1;
         op.startTime = System.nanoTime();
-        op.completed = FALSE;
         ml.mbean.addAddEntrySample(op.dataLength);
         if (log.isDebugEnabled()) {
             log.debug("Created new OpAddEntry {}", op);
@@ -97,14 +94,12 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
         ByteBuf duplicateBuffer = data.retainedDuplicate();
 
         // internally asyncAddEntry() will take the ownership of the buffer and release it at the end
+        addOpCount = ManagedLedgerImpl.ADD_OP_COUNT_UPDATER.incrementAndGet(ml);;
         lastInitTime = System.nanoTime();
-        ledger.asyncAddEntry(duplicateBuffer, this, ctx);
+        ledger.asyncAddEntry(duplicateBuffer, this, addOpCount);
     }
 
     public void failed(ManagedLedgerException e) {
-        if (!checkAndCompleteTimeoutTask()) {
-            return;
-        }
         AddEntryCallback cb = callbackUpdater.getAndSet(this, null);
         data.release();
         if (cb != null) {
@@ -120,7 +115,11 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
         }
         checkArgument(ledger.getId() == lh.getId(), "ledgerId %s doesn't match with acked ledgerId %s", ledger.getId(),
                 lh.getId());
-        checkArgument(this.ctx == ctx);
+        
+        if (!checkAndCompleteOp(ctx)) {
+            // means callback might have been completed by different thread (timeout task thread).. so do nothing
+            return;
+        }
 
         this.entryId = entryId;
         if (log.isDebugEnabled()) {
@@ -131,9 +130,6 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
         if (rc != BKException.Code.OK) {
             handleAddFailure(lh);
         } else {
-            if(!checkAndCompleteTimeoutTask()) {
-                return;
-            }
             // Trigger addComplete callback in a thread hashed on the managed ledger name
             ml.getExecutor().executeOrdered(ml.getName(), this);
         }
@@ -205,19 +201,23 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
     }
 
     /**
-     * It cancels timeout task and checks if add-entry operation is not completed yet.
+     * Checks if add-operation is completed
      * 
      * @return true if task is not already completed else returns false.
      */
-    private boolean checkAndCompleteTimeoutTask() {
-        if (!COMPLETED_UPDATER.compareAndSet(this, FALSE, TRUE)) {
-            if (log.isDebugEnabled()) {
-                log.debug("Add-entry already completed for {}-{}", this.ledger != null ? this.ledger.getId() : -1,
-                        this.entryId);
-            }
-            return false;
+    private boolean checkAndCompleteOp(Object ctx) {
+        long addOpCount = (ctx != null && ctx instanceof Long) ? (long) ctx : -1;
+        if (addOpCount != -1 && ADD_OP_COUNT_UPDATER.compareAndSet(this, this.addOpCount, -1)) {
+            return true;
+        }
+        log.info("Add-entry already completed for {}-{}", ledger != null ? ledger.getId() : -1, entryId);
+        return false;
+    }
+
+    void handleAddTimeoutFailure(final LedgerHandle ledger, Object ctx) {
+        if (checkAndCompleteOp(ctx)) {
+            this.handleAddFailure(ledger);
         }
-        return true;
     }
 
     /**
@@ -257,8 +257,8 @@ class OpAddEntry extends SafeRunnable implements AddCallback, CloseCallback {
         dataLength = -1;
         callback = null;
         ctx = null;
+        addOpCount = -1;
         closeWhenDone = false;
-        completed = FALSE;
         entryId = -1;
         startTime = -1;
         lastInitTime = -1;