You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/26 21:05:50 UTC
[pulsar] branch master updated: [pulsar-broker] minor fix for
removing atomic-updater at managed-ledger (#4146)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 07de52d [pulsar-broker] minor fix for removing atomic-updater at managed-ledger (#4146)
07de52d is described below
commit 07de52d35f04e0bda71d6cdf79ded591cd9040aa
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 26 14:05:43 2019 -0700
[pulsar-broker] minor fix for removing atomic-updater at managed-ledger (#4146)
---
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 18 ++++++++----------
1 file changed, 8 insertions(+), 10 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4f72ba3..6086259 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -236,8 +236,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
.newUpdater(ManagedLedgerImpl.class, "readOpCount");
private volatile long readOpCount = 0;
// last read-operation's callback to check read-timeout on it.
- private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
- .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
private volatile ReadEntryCallbackWrapper lastReadCallback = null;
/**
@@ -1592,7 +1590,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
position.getEntryId(), callback, readOpCount, createdTime, ctx);
- LAST_READ_CALLBACK.set(this, readCallback);
+ lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, position, callback, ctx);
@@ -1607,7 +1605,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
long createdTime = System.nanoTime();
ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
opReadEntry, readOpCount, createdTime, ctx);
- LAST_READ_CALLBACK.set(this, readCallback);
+ lastReadCallback = readCallback;
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
} else {
entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
@@ -3128,12 +3126,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
if (timeoutSec < 1) {
return;
}
- ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
- if (callback != null && callback.isTimedOut(timeoutSec)) {
- log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
- timeoutSec);
- callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
- LAST_READ_CALLBACK.set(this, null);
+ if (this.lastReadCallback != null && this.lastReadCallback.isTimedOut(timeoutSec)) {
+ log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, this.lastReadCallback.ledgerId,
+ this.lastReadCallback.entryId, timeoutSec);
+ this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
+ this.lastReadCallback.readOpCount);
+ lastReadCallback = null;
}
}