You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/04/26 21:05:50 UTC

[pulsar] branch master updated: [pulsar-broker] minor fix for removing atomic-updater at managed-ledger (#4146)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 07de52d  [pulsar-broker] minor fix for removing atomic-updater at managed-ledger (#4146)
07de52d is described below

commit 07de52d35f04e0bda71d6cdf79ded591cd9040aa
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Fri Apr 26 14:05:43 2019 -0700

    [pulsar-broker] minor fix for removing atomic-updater at managed-ledger (#4146)
---
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java     | 18 ++++++++----------
 1 file changed, 8 insertions(+), 10 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 4f72ba3..6086259 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -236,8 +236,6 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             .newUpdater(ManagedLedgerImpl.class, "readOpCount");
     private volatile long readOpCount = 0;
     // last read-operation's callback to check read-timeout on it. 
-    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, ReadEntryCallbackWrapper> LAST_READ_CALLBACK = AtomicReferenceFieldUpdater
-            .newUpdater(ManagedLedgerImpl.class, ReadEntryCallbackWrapper.class, "lastReadCallback");
     private volatile ReadEntryCallbackWrapper lastReadCallback = null;
     
     /**
@@ -1592,7 +1590,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             long createdTime = System.nanoTime();
             ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
                     position.getEntryId(), callback, readOpCount, createdTime, ctx);
-            LAST_READ_CALLBACK.set(this, readCallback);
+            lastReadCallback = readCallback;
             entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
         } else {
             entryCache.asyncReadEntry(ledger, position, callback, ctx);
@@ -1607,7 +1605,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             long createdTime = System.nanoTime();
             ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
                     opReadEntry, readOpCount, createdTime, ctx);
-            LAST_READ_CALLBACK.set(this, readCallback);
+            lastReadCallback = readCallback;
             entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
         } else {
             entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
@@ -3128,12 +3126,12 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
         if (timeoutSec < 1) {
             return;
         }
-        ReadEntryCallbackWrapper callback = LAST_READ_CALLBACK.get(this);
-        if (callback != null && callback.isTimedOut(timeoutSec)) {
-            log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, callback.ledgerId, callback.entryId,
-                    timeoutSec);
-            callback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException), callback.readOpCount);
-            LAST_READ_CALLBACK.set(this, null);
+        if (this.lastReadCallback != null && this.lastReadCallback.isTimedOut(timeoutSec)) {
+            log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, this.lastReadCallback.ledgerId,
+                    this.lastReadCallback.entryId, timeoutSec);
+            this.lastReadCallback.readFailed(createManagedLedgerException(BKException.Code.TimeoutException),
+                    this.lastReadCallback.readOpCount);
+            lastReadCallback = null;
         }
     }