You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rd...@apache.org on 2019/06/09 19:47:26 UTC
[pulsar] branch master updated: [pulsar-broker] Fix race condition
of read-timeout task in ML (#4437)
This is an automated email from the ASF dual-hosted git repository.
rdhabalia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 84d2d1e [pulsar-broker] Fix race condition of read-timeout task in ML (#4437)
84d2d1e is described below
commit 84d2d1e443d427d3a730c0addc67c8e58fc56593
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Sun Jun 9 12:47:21 2019 -0700
[pulsar-broker] Fix race condition of read-timeout task in ML (#4437)
* [pulsar-broker] Fix race condition of read-timeout task in ML
* fix atomic recycling
* fix test
* add method for reOpCount
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 110 +++++++++++----------
1 file changed, 60 insertions(+), 50 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 6b9f529..fe49985 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -21,7 +21,6 @@ package org.apache.bookkeeper.mledger.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static java.lang.Math.min;
import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
-import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun;
import com.google.common.collect.BoundType;
@@ -233,6 +232,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
// last read-operation's callback to check read-timeout on it.
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
+ private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK_UPDATER = AtomicReferenceFieldUpdater
+ .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
/**
* Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
@@ -1558,16 +1559,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
- private static final AtomicIntegerFieldUpdater<ReadEntryCallbackWrapper> READ_COMPLETED_UPDATER = AtomicIntegerFieldUpdater
- .newUpdater(ReadEntryCallbackWrapper.class, "readCompleted");
- @SuppressWarnings("unused")
- volatile int readCompleted = FALSE;
volatile ReadEntryCallback readEntryCallback;
volatile ReadEntriesCallback readEntriesCallback;
String name;
long ledgerId;
long entryId;
volatile long readOpCount = -1;
+ private static final AtomicLongFieldUpdater<ReadEntryCallbackWrapper> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+ .newUpdater(ReadEntryCallbackWrapper.class, "readOpCount");
volatile long createdTime = -1;
volatile Object cntx;
@@ -1603,60 +1602,73 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
return readCallback;
}
- public boolean isTimedOut(long timeoutSec) {
- return this.createdTime != -1
- && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - this.createdTime) >= timeoutSec
- && this.readCompleted == FALSE;
- }
-
@Override
public void readEntryComplete(Entry entry, Object ctx) {
- if (checkCallbackCompleted(ctx)) {
+ long reOpCount = reOpCount(ctx);
+ ReadEntryCallback callback = this.readEntryCallback;
+ Object cbCtx = this.cntx;
+ if (recycle(reOpCount)) {
+ callback.readEntryComplete(entry, cbCtx);
+ return;
+ } else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
entry.release();
return;
}
- readEntryCallback.readEntryComplete(entry, cntx);
- recycle();
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
- if (checkCallbackCompleted(ctx)) {
+ long reOpCount = reOpCount(ctx);
+ ReadEntryCallback callback = this.readEntryCallback;
+ Object cbCtx = this.cntx;
+ if (recycle(reOpCount)) {
+ callback.readEntryFailed(exception, cbCtx);
+ return;
+ } else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
- return;
}
- readEntryCallback.readEntryFailed(exception, cntx);
- recycle();
}
@Override
public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
- if (checkCallbackCompleted(ctx)) {
+ long reOpCount = reOpCount(ctx);
+ ReadEntriesCallback callback = this.readEntriesCallback;
+ Object cbCtx = this.cntx;
+ if (recycle(reOpCount)) {
+ callback.readEntriesComplete(returnedEntries, cbCtx);
+ return;
+ } else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
returnedEntries.forEach(Entry::release);
return;
}
- readEntriesCallback.readEntriesComplete(returnedEntries, cntx);
- recycle();
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
- if (checkCallbackCompleted(ctx)) {
+ long reOpCount = reOpCount(ctx);
+ ReadEntriesCallback callback = this.readEntriesCallback;
+ Object cbCtx = this.cntx;
+ if (recycle(reOpCount)) {
+ callback.readEntriesFailed(exception, cbCtx);
+ return;
+ } else {
if (log.isDebugEnabled()) {
log.debug("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
}
return;
}
- readEntriesCallback.readEntriesFailed(exception, cntx);
- recycle();
+ }
+
+ private long reOpCount(Object ctx) {
+ return (ctx != null && ctx instanceof Long) ? (long) ctx : -1;
}
public void readFailed(ManagedLedgerException exception, Object ctx) {
@@ -1664,30 +1676,24 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
readEntryFailed(exception, ctx);
} else if (readEntriesCallback != null) {
readEntriesFailed(exception, ctx);
- } else {
- // it should not happen .. recycle if none of the callback exists..
- recycle();
}
+ // It happens when timeout-thread and read-callback both recycles at the same time.
+ // this read-callback has already been recycled so, do nothing..
}
- private boolean checkCallbackCompleted(Object ctx) {
- // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
- // assigned to different request
- boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
- // consider callback is completed if: Callback is already recycled or read-complete flag is true
- return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
- }
-
- private void recycle() {
- readOpCount = -1;
- createdTime = -1;
- readEntryCallback = null;
- readEntriesCallback = null;
- ledgerId = -1;
- entryId = -1;
- name = null;
- readCompleted = FALSE;
- recyclerHandle.recycle(this);
+ private boolean recycle(long readOpCount) {
+ if (readOpCount != -1
+ && READ_OP_COUNT_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, readOpCount, -1)) {
+ createdTime = -1;
+ readEntryCallback = null;
+ readEntriesCallback = null;
+ ledgerId = -1;
+ entryId = -1;
+ name = null;
+ recyclerHandle.recycle(this);
+ return true;
+ }
+ return false;
}
private static final Recycler<ReadEntryCallbackWrapper> RECYCLER = new Recycler<ReadEntryCallbackWrapper>() {
@@ -3095,12 +3101,16 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (timeoutSec < 1) {
return;
}
- if (this.lastReadCallback != null && this.lastReadCallback.isTimedOut(timeoutSec)) {
- log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, this.lastReadCallback.ledgerId,
+ ReadEntryCallbackWrapper callback = this.lastReadCallback;
+ long readOpCount = callback != null ? callback.readOpCount : 0;
+ boolean timeout = callback != null
+ ? (TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - callback.createdTime) >= timeoutSec)
+ : false;
+ if (readOpCount > 0 && callback != null && timeout) {
+ log.warn("[{}]-{}-{} read entry timeout after {} sec", this.name, this.lastReadCallback.ledgerId,
this.lastReadCallback.entryId, timeoutSec);
- this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
- this.lastReadCallback.readOpCount);
- lastReadCallback = null;
+ callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
+ LAST_READ_CALLBACK_UPDATER.compareAndSet(this, callback, null);
}
}