You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/09 19:47:26 UTC

[pulsar] branch master updated: [pulsar-broker] Fix race condition of read-timeout task in ML (#4437)

This is an automated email from the ASF dual-hosted git repository.

rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 84d2d1e  [pulsar-broker] Fix race condition of read-timeout task in ML (#4437)
84d2d1e is described below

commit 84d2d1e443d427d3a730c0addc67c8e58fc56593
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jun 9 12:47:21 2019 -0700

    [pulsar-broker] Fix race condition of read-timeout task in ML (#4437)
    
    * [pulsar-broker] Fix race condition of read-timeout task in ML
    
    * fix atomic recycling
    
    * fix test
    
    * add method for reOpCount
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 110 +++++++++++----------
 1 file changed, 60 insertions(+), 50 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6b9f529..fe49985 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
 import static com.google.common.base.Preconditions.checkArgument;
 import static java.lang.Math.min;
 import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
 import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
 
 import com.google.common.collect.BoundType;
@@ -233,6 +232,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     // last read-operation's callback to check read-timeout on it.
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
+    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
 
     /**
      * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
@@ -1558,16 +1559,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
 
     static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
 
-        private static final AtomicIntegerFieldUpdater<ReadEntryCallbackWrapper> READ_COMPLETED_UPDATER = AtomicIntegerFieldUpdater
-                .newUpdater(ReadEntryCallbackWrapper.class, "readCompleted");
-        @SuppressWarnings("unused")
-        volatile int readCompleted = FALSE;
         volatile ReadEntryCallback readEntryCallback;
         volatile ReadEntriesCallback readEntriesCallback;
         String name;
         long ledgerId;
         long entryId;
         volatile long readOpCount = -1;
+        private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+                .newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
         volatile long createdTime = -1;
         volatile Object cntx;
 
@@ -1603,60 +1602,73 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             return readCallback;
         }
 
-        public boolean isTimedOut(long timeoutSec) {
-            return this.createdTime != -1
-                    && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.createdTime) >= timeoutSec
-                    && this.readCompleted == FALSE;
-        }
-
         @Override
         public void readEntryComplete(Entry entry, Object ctx) {
-            if (checkCallbackCompleted(ctx)) {
+            long reOpCount = reOpCount(ctx);
+            ReadEntryCallback callback = this.readEntryCallback;
+            Object cbCtx = this.cntx;
+            if (recycle(reOpCount)) {
+                callback.readEntryComplete(entry, cbCtx);
+                return;
+            } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
                 }
                 entry.release();
                 return;
             }
-            readEntryCallback.readEntryComplete(entry, cntx);
-            recycle();
         }
 
         @Override
         public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
-            if (checkCallbackCompleted(ctx)) {
+            long reOpCount = reOpCount(ctx);
+            ReadEntryCallback callback = this.readEntryCallback;
+            Object cbCtx = this.cntx;
+            if (recycle(reOpCount)) {
+                callback.readEntryFailed(exception, cbCtx);
+                return;
+            } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
                 }
-                return;
             }
-            readEntryCallback.readEntryFailed(exception, cntx);
-            recycle();
         }
 
         @Override
         public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
-            if (checkCallbackCompleted(ctx)) {
+            long reOpCount = reOpCount(ctx);
+            ReadEntriesCallback callback = this.readEntriesCallback;
+            Object cbCtx = this.cntx;
+            if (recycle(reOpCount)) {
+                callback.readEntriesComplete(returnedEntries, cbCtx);
+                return;
+            } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
                 }
                 returnedEntries.forEach(Entry::release);
                 return;
             }
-            readEntriesCallback.readEntriesComplete(returnedEntries, cntx);
-            recycle();
         }
 
         @Override
         public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
-            if (checkCallbackCompleted(ctx)) {
+            long reOpCount = reOpCount(ctx);
+            ReadEntriesCallback callback = this.readEntriesCallback;
+            Object cbCtx = this.cntx;
+            if (recycle(reOpCount)) {
+                callback.readEntriesFailed(exception, cbCtx);
+                return;
+            } else {
                 if (log.isDebugEnabled()) {
                     log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
                 }
                 return;
             }
-            readEntriesCallback.readEntriesFailed(exception, cntx);
-            recycle();
+        }
+
+        private long reOpCount(Object ctx) {
+            return (ctx != null && ctx instanceof Long) ? (long) ctx : -1;
         }
 
         public void readFailed(ManagedLedgerException exception, Object ctx) {
@@ -1664,30 +1676,24 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                 readEntryFailed(exception, ctx);
             } else if (readEntriesCallback != null) {
                 readEntriesFailed(exception, ctx);
-            } else {
-                // it should not happen .. recycle if none of the callback exists..
-                recycle();
             }
+            // It happens when timeout-thread and read-callback both recycles at the same time.
+            // this read-callback has already been recycled so, do nothing..
         }
 
-        private boolean checkCallbackCompleted(Object ctx) {
-            // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
-            // assigned to different request
-            boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
-            // consider callback is completed if: Callback is already recycled or read-complete flag is true
-            return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
-        }
-
-        private void recycle() {
-            readOpCount = -1;
-            createdTime = -1;
-            readEntryCallback = null;
-            readEntriesCallback = null;
-            ledgerId = -1;
-            entryId = -1;
-            name = null;
-            readCompleted = FALSE;
-            recyclerHandle.recycle(this);
+        private boolean recycle(long readOpCount) {
+            if (readOpCount != -1
+                    && READ_OP_COUNT_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, readOpCount, -1)) {
+                createdTime = -1;
+                readEntryCallback = null;
+                readEntriesCallback = null;
+                ledgerId = -1;
+                entryId = -1;
+                name = null;
+                recyclerHandle.recycle(this);
+                return true;
+            }
+            return false;
         }
 
         private static final Recycler<ReadEntryCallbackWrapper> RECYCLER = new Recycler<ReadEntryCallbackWrapper>() {
@@ -3095,12 +3101,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (timeoutSec < 1) {
             return;
         }
-        if (this.lastReadCallback != null && this.lastReadCallback.isTimedOut(timeoutSec)) {
-            log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, this.lastReadCallback.ledgerId,
+        ReadEntryCallbackWrapper callback = this.lastReadCallback;
+        long readOpCount = callback != null ? callback.readOpCount : 0;
+        boolean timeout = callback != null
+                ? (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec)
+                : false;
+        if (readOpCount > 0 && callback != null && timeout) {
+            log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId,
                     this.lastReadCallback.entryId, timeoutSec);
-            this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
-                    this.lastReadCallback.readOpCount);
-            lastReadCallback = null;
+            callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
+            LAST_READ_CALLBACK_UPDATER.compareAndSet(this, callback, null);
         }
     }