You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/10 01:14:23 UTC

[GitHub] rdhabalia closed pull request #3019: add read-timeout option to async managed-ledger read

rdhabalia closed pull request #3019: add read-timeout option to async managed-ledger read
URL: https://github.com/apache/pulsar/pull/3019
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/conf/broker.conf b/conf/broker.conf
index d987c9be9e..1cf3a8a371 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -442,6 +442,9 @@ autoSkipNonRecoverableData=false
 # operation timeout while updating managed-ledger metadata.
 managedLedgerMetadataOperationsTimeoutSeconds=60
 
+# Read entries timeout when broker tries to read messages from bookkeeper.
+managedLedgerReadEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 # Enable load balancer
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 70850545bf..16d6465dfb 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -329,6 +329,9 @@ autoSkipNonRecoverableData=false
 # operation timeout while updating managed-ledger metadata.
 managedLedgerMetadataOperationsTimeoutSeconds=60
 
+# Read entries timeout when broker tries to read messages from bookkeeper.
+managedLedgerReadEntryTimeoutSeconds=120
+
 ### --- Load balancer --- ###
 
 loadManagerClassName=org.apache.pulsar.broker.loadbalance.NoopLoadManager
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 5967453ca5..255b534920 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -57,6 +57,7 @@
     private long offloadLedgerDeletionLagMs = TimeUnit.HOURS.toMillis(4);
     private long offloadAutoTriggerSizeThresholdBytes = -1;
     private long metadataOperationsTimeoutSeconds = 60;
+    private long readEntryTimeoutSeconds = 120;
 
     private DigestType digestType = DigestType.CRC32C;
     private byte[] password = "".getBytes(Charsets.UTF_8);
@@ -532,4 +533,25 @@ public ManagedLedgerConfig setMetadataOperationsTimeoutSeconds(long metadataOper
         this.metadataOperationsTimeoutSeconds = metadataOperationsTimeoutSeconds;
         return this;
     }
+    
+    /**
+     * Ledger read-entry timeout
+     * 
+     * @return
+     */
+    public long getReadEntryTimeoutSeconds() {
+        return readEntryTimeoutSeconds;
+    }
+
+    /**
+     * Ledger read entry timeout after which callback will be completed with failure. (disable timeout by setting
+     * readTimeoutSeconds <= 0)
+     * 
+     * @param readTimeoutSeconds
+     * @return
+     */
+    public ManagedLedgerConfig setReadEntryTimeoutSeconds(long readEntryTimeoutSeconds) {
+        this.readEntryTimeoutSeconds = readEntryTimeoutSeconds;
+        return this;
+    }
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 35cd1b7de4..ca5d2df759 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -104,8 +104,8 @@
     @SuppressWarnings("unused")
     private volatile OpReadEntry waitingReadOp = null;
 
-    private static final int FALSE = 0;
-    private static final int TRUE = 1;
+    public static final int FALSE = 0;
+    public static final int TRUE = 1;
     private static final AtomicIntegerFieldUpdater<ManagedCursorImpl> RESET_CURSOR_IN_PROGRESS_UPDATER =
         AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "resetCursorInProgress");
     @SuppressWarnings("unused")
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index e9b4c9034b..dd54630da0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -38,10 +38,12 @@
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLongFieldUpdater;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
@@ -59,6 +61,7 @@
 import org.apache.bookkeeper.common.util.OrderedExecutor;
 import org.apache.bookkeeper.common.util.OrderedScheduler;
 import org.apache.bookkeeper.common.util.Retries;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
@@ -112,6 +115,10 @@
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
+import io.netty.util.Recycler;
+import io.netty.util.Recycler.Handle;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
     private final static long MegaByte = 1024 * 1024;
@@ -135,17 +142,17 @@
     private final ManagedCursorContainer activeCursors = new ManagedCursorContainer();
 
     // Ever increasing counter of entries added
-    static final AtomicLongFieldUpdater<ManagedLedgerImpl> ENTRIES_ADDED_COUNTER_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
+    static final AtomicLongFieldUpdater<ManagedLedgerImpl> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, "entriesAddedCounter");
     @SuppressWarnings("unused")
     private volatile long entriesAddedCounter = 0;
 
-    static final AtomicLongFieldUpdater<ManagedLedgerImpl> NUMBER_OF_ENTRIES_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "numberOfEntries");
+    static final AtomicLongFieldUpdater<ManagedLedgerImpl> NUMBER_OF_ENTRIES_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, "numberOfEntries");
     @SuppressWarnings("unused")
     private volatile long numberOfEntries = 0;
-    static final AtomicLongFieldUpdater<ManagedLedgerImpl> TOTAL_SIZE_UPDATER =
-            AtomicLongFieldUpdater.newUpdater(ManagedLedgerImpl.class, "totalSize");
+    static final AtomicLongFieldUpdater<ManagedLedgerImpl> TOTAL_SIZE_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, "totalSize");
     @SuppressWarnings("unused")
     private volatile long totalSize = 0;
 
@@ -168,8 +175,8 @@
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
     private final CallbackMutex offloadMutex = new CallbackMutex();
-    private final static CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE
-        = CompletableFuture.completedFuture(PositionImpl.latest);
+    private final static CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
+            .completedFuture(PositionImpl.latest);
     private volatile LedgerHandle currentLedger;
     private long currentLedgerEntries = 0;
     private long currentLedgerSize = 0;
@@ -187,7 +194,7 @@
 
     protected static final int DEFAULT_LEDGER_DELETE_RETRIES = 3;
     protected static final int DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC = 60;
-    
+
     enum State {
         None, // Uninitialized
         LedgerOpened, // A ledger is ready to write into
@@ -209,8 +216,8 @@
         startIncluded, startExcluded
     }
 
-    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER =
-            AtomicReferenceFieldUpdater.newUpdater(ManagedLedgerImpl.class, State.class, "state");
+    private static final AtomicReferenceFieldUpdater<ManagedLedgerImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, State.class, "state");
     protected volatile State state = null;
 
     private final OrderedScheduler scheduledExecutor;
@@ -218,6 +225,9 @@
     final ManagedLedgerFactoryImpl factory;
     protected final ManagedLedgerMBeanImpl mbean;
     protected final Clock clock;
+    private static final AtomicLongFieldUpdater<ManagedLedgerImpl> READ_OP_COUNT_UPDATER = AtomicLongFieldUpdater
+            .newUpdater(ManagedLedgerImpl.class, "readOpCount");
+    private volatile long readOpCount = 0;
 
     /**
      * Queue of pending entries to be added to the managed ledger. Typically entries are queued when a new ledger is
@@ -366,7 +376,7 @@ public void operationFailed(MetaStoreException e) {
         // Create a new ledger to start writing
         this.lastLedgerCreationInitiationTimestamp = System.nanoTime();
         mbean.startDataLedgerCreateOp();
-        
+
         asyncCreateLedger(bookKeeper, config, digestType, (rc, lh, ctx) -> {
 
             if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
@@ -601,12 +611,13 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) {
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException{
+    public ManagedCursor openCursor(String cursorName) throws InterruptedException, ManagedLedgerException {
         return openCursor(cursorName, InitialPosition.Latest);
     }
 
     @Override
-    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition) throws InterruptedException, ManagedLedgerException {
+    public ManagedCursor openCursor(String cursorName, InitialPosition initialPosition)
+            throws InterruptedException, ManagedLedgerException {
         final CountDownLatch counter = new CountDownLatch(1);
         class Result {
             ManagedCursor cursor = null;
@@ -642,12 +653,13 @@ public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx){
+    public synchronized void asyncOpenCursor(final String cursorName, final OpenCursorCallback callback, Object ctx) {
         this.asyncOpenCursor(cursorName, InitialPosition.Latest, callback, ctx);
     }
 
     @Override
-    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition, final OpenCursorCallback callback, final Object ctx){
+    public synchronized void asyncOpenCursor(final String cursorName, final InitialPosition initialPosition,
+            final OpenCursorCallback callback, final Object ctx) {
         try {
             checkManagedLedgerIsOpen();
             checkFenced();
@@ -687,7 +699,8 @@ public void operationComplete() {
                 log.info("[{}] Opened new cursor: {}", name, cursor);
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
-                cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter() : getFirstPositionAndCounter());
+                cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()
+                        : getFirstPositionAndCounter());
 
                 synchronized (this) {
                     cursors.add(cursor);
@@ -946,7 +959,7 @@ long estimateBacklogFromPosition(PositionImpl pos) {
                 return getTotalSize(); // position no longer in managed ledger, so return total size
             }
             long sizeBeforePosLedger = ledgers.values().stream().filter(li -> li.getLedgerId() < pos.getLedgerId())
-                .mapToLong(li -> li.getSize()).sum();
+                    .mapToLong(li -> li.getSize()).sum();
             long size = getTotalSize() - sizeBeforePosLedger;
 
             if (pos.getLedgerId() == currentLedger.getId()) {
@@ -1160,11 +1173,11 @@ public synchronized void createComplete(int rc, final LedgerHandle lh, Object ct
         if (log.isDebugEnabled()) {
             log.debug("[{}] createComplete rc={} ledger={}", name, rc, lh != null ? lh.getId() : -1);
         }
-        
+
         if (checkAndCompleteLedgerOpTask(rc, lh, ctx)) {
             return;
         }
-        
+
         mbean.endDataLedgerCreateOp();
         if (rc != BKException.Code.OK) {
             log.error("[{}] Error creating ledger rc={} {}", name, rc, BKException.getMessage(rc));
@@ -1201,8 +1214,8 @@ public void operationFailed(MetaStoreException e) {
                     if (e instanceof BadVersionException) {
                         synchronized (ManagedLedgerImpl.this) {
                             log.error(
-                                "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger",
-                                name);
+                                    "[{}] Failed to udpate ledger list. z-node version mismatch. Closing managed ledger",
+                                    name);
                             STATE_UPDATER.set(ManagedLedgerImpl.this, State.Fenced);
                             clearPendingAddEntries(e);
                             return;
@@ -1373,7 +1386,8 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, opReadEntry.readPosition,
                         ex.getMessage());
-                opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), opReadEntry.ctx);
+                opReadEntry.readEntriesFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()),
+                        opReadEntry.ctx);
                 return null;
             });
         }
@@ -1387,43 +1401,39 @@ void asyncReadEntries(OpReadEntry opReadEntry) {
 
         // If not present try again and create if necessary
         return ledgerCache.computeIfAbsent(ledgerId, lid -> {
-                // Open the ledger for reading if it was not already opened
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId);
-                }
-                mbean.startDataLedgerOpenOp();
-
-                CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
-
-                LedgerInfo info = ledgers.get(ledgerId);
-                CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
-                if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
-                    UUID uid = new UUID(info.getOffloadContext().getUidMsb(),
-                                        info.getOffloadContext().getUidLsb());
-                    // TODO: improve this to load ledger offloader by driver name recorded in metadata
-                    openFuture = config.getLedgerOffloader()
-                        .readOffloaded(ledgerId, uid, OffloadUtils.getOffloadDriverMetadata(info));
+            // Open the ledger for reading if it was not already opened
+            if (log.isDebugEnabled()) {
+                log.debug("[{}] Asynchronously opening ledger {} for read", name, ledgerId);
+            }
+            mbean.startDataLedgerOpenOp();
+
+            CompletableFuture<ReadHandle> promise = new CompletableFuture<>();
+
+            LedgerInfo info = ledgers.get(ledgerId);
+            CompletableFuture<ReadHandle> openFuture = new CompletableFuture<>();
+            if (info != null && info.hasOffloadContext() && info.getOffloadContext().getComplete()) {
+                UUID uid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
+                // TODO: improve this to load ledger offloader by driver name recorded in metadata
+                openFuture = config.getLedgerOffloader().readOffloaded(ledgerId, uid,
+                        OffloadUtils.getOffloadDriverMetadata(info));
+            } else {
+                openFuture = bookKeeper.newOpenLedgerOp().withRecovery(!isReadOnly()).withLedgerId(ledgerId)
+                        .withDigestType(config.getDigestType()).withPassword(config.getPassword()).execute();
+            }
+            openFuture.whenCompleteAsync((res, ex) -> {
+                mbean.endDataLedgerOpenOp();
+                if (ex != null) {
+                    ledgerCache.remove(ledgerId, promise);
+                    promise.completeExceptionally(createManagedLedgerException(ex));
                 } else {
-                    openFuture = bookKeeper.newOpenLedgerOp()
-                        .withRecovery(!isReadOnly())
-                        .withLedgerId(ledgerId)
-                        .withDigestType(config.getDigestType())
-                        .withPassword(config.getPassword()).execute();
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] Successfully opened ledger {} for reading", name, ledgerId);
+                    }
+                    promise.complete(res);
                 }
-                openFuture.whenCompleteAsync((res,ex) -> {
-                        mbean.endDataLedgerOpenOp();
-                        if (ex != null) {
-                            ledgerCache.remove(ledgerId, promise);
-                            promise.completeExceptionally(createManagedLedgerException(ex));
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] Successfully opened ledger {} for reading", name, ledgerId);
-                            }
-                            promise.complete(res);
-                        }
-                    }, executor.chooseThread(name));
-                return promise;
-            });
+            }, executor.chooseThread(name));
+            return promise;
+        });
     }
 
     void invalidateLedgerHandle(ReadHandle ledgerHandle, Throwable t) {
@@ -1448,10 +1458,10 @@ void asyncReadEntry(PositionImpl position, ReadEntryCallback callback, Object ct
         }
         if (position.getLedgerId() == currentLedger.getId()) {
             LedgerHandle ledger = currentLedger;
-            entryCache.asyncReadEntry(ledger, position, callback, ctx);
+            asyncReadEntry(ledger, position, callback, ctx);
         } else {
             getLedgerHandle(position.getLedgerId()).thenAccept(ledger -> {
-                entryCache.asyncReadEntry(ledger, position, callback, ctx);
+                asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally(ex -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", name, position, ex.getMessage());
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
@@ -1506,7 +1516,7 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
             log.debug("[{}] Reading entries from ledger {} - first={} last={}", name, ledger.getId(), firstEntry,
                     lastEntry);
         }
-        entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
+        asyncReadEntry(ledger, firstEntry, lastEntry, false, opReadEntry, opReadEntry.ctx);
 
         if (updateCursorRateLimit.tryAcquire()) {
             if (isCursorActive(cursor)) {
@@ -1516,6 +1526,177 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
         }
     }
 
+    protected void asyncReadEntry(ReadHandle ledger, PositionImpl position, ReadEntryCallback callback, Object ctx) {
+        long timeout = config.getReadEntryTimeoutSeconds();
+        boolean checkTimeout = timeout > 0;
+        if (checkTimeout) {
+            // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
+            long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+            ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, position.getLedgerId(),
+                    position.getEntryId(), callback, readOpCount, ctx);
+            final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
+                // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
+                // readOpCount) and fail the callback if read is not completed yet
+                if (readCallback.readOpCount == readOpCount
+                        && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
+                    log.warn("[{}]-{} read entry timeout for {} after {} sec", this.name, ledger.getId(), position,
+                            timeout);
+                    readCallback.readEntryFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
+                }
+            }, timeout, TimeUnit.SECONDS);
+            readCallback.task = task;
+            entryCache.asyncReadEntry(ledger, position, readCallback, readOpCount);
+        } else {
+            entryCache.asyncReadEntry(ledger, position, callback, ctx);
+        }
+    }
+
+    protected void asyncReadEntry(ReadHandle ledger, long firstEntry, long lastEntry, boolean isSlowestReader,
+            OpReadEntry opReadEntry, Object ctx) {
+        long timeout = config.getReadEntryTimeoutSeconds();
+        boolean checkTimeout = timeout > 0;
+        if (checkTimeout) {
+            // set readOpCount to uniquely validate if ReadEntryCallbackWrapper is already recycled
+            long readOpCount = READ_OP_COUNT_UPDATER.incrementAndGet(this);
+            ReadEntryCallbackWrapper readCallback = ReadEntryCallbackWrapper.create(name, ledger.getId(), firstEntry,
+                    opReadEntry, readOpCount, ctx);
+            final ScheduledFuture<?> task = scheduledExecutor.schedule(() -> {
+                // validate ReadEntryCallbackWrapper object is not recycled by bk-client callback (by validating
+                // readOpCount) and fail the callback if read is not completed yet
+                if (readCallback.readOpCount == readOpCount
+                        && ReadEntryCallbackWrapper.READ_COMPLETED_UPDATER.get(readCallback) == FALSE) {
+                    log.warn("[{}]-{} read entry timeout for {}-{} after {} sec", this.name, ledger.getId(), firstEntry,
+                            lastEntry, timeout);
+                    readCallback.readEntriesFailed(createManagedLedgerException(BKException.Code.TimeoutException), readOpCount);
+                }
+            }, timeout, TimeUnit.SECONDS);
+            readCallback.task = task;
+            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, readCallback, readOpCount);
+        } else {
+            entryCache.asyncReadEntry(ledger, firstEntry, lastEntry, isSlowestReader, opReadEntry, ctx);
+        }
+    }
+
+    static final class ReadEntryCallbackWrapper implements ReadEntryCallback, ReadEntriesCallback {
+
+        private static final AtomicIntegerFieldUpdater<ReadEntryCallbackWrapper> READ_COMPLETED_UPDATER = AtomicIntegerFieldUpdater
+                .newUpdater(ReadEntryCallbackWrapper.class, "readCompleted");
+        @SuppressWarnings("unused")
+        volatile int readCompleted = FALSE;
+        volatile ReadEntryCallback readEntryCallback;
+        volatile ReadEntriesCallback readEntriesCallback;
+        String name;
+        long ledgerId;
+        long entryId;
+        ScheduledFuture<?> task;
+        volatile long readOpCount = -1;
+        volatile Object cntx;
+
+        final Handle<ReadEntryCallbackWrapper> recyclerHandle;
+
+        private ReadEntryCallbackWrapper(Handle<ReadEntryCallbackWrapper> recyclerHandle) {
+            this.recyclerHandle = recyclerHandle;
+        }
+
+        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntryCallback callback, long readOpCount, Object ctx) {
+            ReadEntryCallbackWrapper readCallback = RECYCLER.get();
+            readCallback.name = name;
+            readCallback.ledgerId = ledgerId;
+            readCallback.entryId = entryId;
+            readCallback.readEntryCallback = callback;
+            readCallback.cntx = ctx;
+            readCallback.readOpCount = readOpCount;
+            return readCallback;
+        }
+
+        static ReadEntryCallbackWrapper create(String name, long ledgerId, long entryId, ReadEntriesCallback callback, long readOpCount, Object ctx) {
+            ReadEntryCallbackWrapper readCallback = RECYCLER.get();
+            readCallback.name = name;
+            readCallback.ledgerId = ledgerId;
+            readCallback.entryId = entryId;
+            readCallback.readEntriesCallback = callback;
+            readCallback.cntx = ctx;
+            readCallback.readOpCount = readOpCount;
+            return readCallback;
+        }
+
+        @Override
+        public void readEntryComplete(Entry entry, Object ctx) {
+            if (checkCallbackCompleted(ctx)) {
+                log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                entry.release();
+                return;
+            }
+            readEntryCallback.readEntryComplete(entry, cntx);
+            recycle();
+        }
+
+        @Override
+        public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+            if (checkCallbackCompleted(ctx)) {
+                log.warn("[{}] read entry already completed for {}-{}", name, ledgerId, entryId);
+                return;
+            }
+            readEntryCallback.readEntryFailed(exception, cntx);
+            recycle();
+        }
+
+        @Override
+        public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+            if (checkCallbackCompleted(ctx)) {
+                log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+                returnedEntries.forEach(Entry::release);
+                return;
+            }
+            readEntriesCallback.readEntriesComplete(returnedEntries, cntx);
+            recycle();
+        }
+
+        @Override
+        public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+            if (checkCallbackCompleted(ctx)) {
+                log.warn("[{}] read entries already completed for {}-{}", name, ledgerId, entryId);
+                return;
+            }
+            readEntriesCallback.readEntriesFailed(exception, cntx);
+            recycle();
+        }
+
+        private boolean checkCallbackCompleted(Object ctx) {
+            // if the ctx-readOpCount is different than object's readOpCount means Object is already recycled and
+            // assigned to different request
+            boolean isRecycled = (ctx != null && ctx instanceof Integer) && (Integer) ctx != readOpCount;
+            // consider callback is completed if: Callback is already recycled or read-complete flag is true  
+            return isRecycled || !READ_COMPLETED_UPDATER.compareAndSet(ReadEntryCallbackWrapper.this, FALSE, TRUE);
+        }
+
+        private void recycle() {
+            readOpCount = -1;
+            if (task != null && !task.isDone() && !task.isCancelled()) {
+                try {
+                    task.cancel(false);
+                } catch (Throwable th) {
+                    log.debug("[{}]Failed to cancle task for read-callback for {}-{}", name, ledgerId, entryId);
+                }
+            }
+            readEntryCallback = null;
+            readEntriesCallback = null;
+            ledgerId = -1;
+            entryId = -1;
+            name = null;
+            readCompleted = FALSE;
+            recyclerHandle.recycle(this);
+        }
+
+        private static final Recycler<ReadEntryCallbackWrapper> RECYCLER = new Recycler<ReadEntryCallbackWrapper>() {
+            @Override
+            protected ReadEntryCallbackWrapper newObject(Handle<ReadEntryCallbackWrapper> handle) {
+                return new ReadEntryCallbackWrapper(handle);
+            }
+        };
+
+    }
+
     @Override
     public ManagedLedgerMXBean getStats() {
         return mbean;
@@ -1587,7 +1768,7 @@ private void trimConsumedLedgersInBackground() {
 
     private void trimConsumedLedgersInBackground(CompletableFuture<?> promise) {
         executor.executeOrdered(name, safeRun(() -> {
-                    internalTrimConsumedLedgers(promise);
+            internalTrimConsumedLedgers(promise);
         }));
     }
 
@@ -1670,9 +1851,8 @@ private boolean isLedgerRetentionOverSizeQuota() {
 
     private boolean isOffloadedNeedsDelete(OffloadContext offload) {
         long elapsedMs = clock.millis() - offload.getTimestamp();
-        return offload.getComplete()
-            && !offload.getBookkeeperDeleted()
-            && elapsedMs > config.getOffloadLedgerDeletionLagMillis();
+        return offload.getComplete() && !offload.getBookkeeperDeleted()
+                && elapsedMs > config.getOffloadLedgerDeletionLagMillis();
     }
 
     /**
@@ -1729,14 +1909,14 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
 
                 if (log.isDebugEnabled()) {
                     log.debug(
-                        "[{}] Checking ledger {} -- time-old: {} sec -- "
-                            + "expired: {} -- over-quota: {} -- current-ledger: {}",
-                        name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
-                        overRetentionQuota, currentLedger.getId());
+                            "[{}] Checking ledger {} -- time-old: {} sec -- "
+                                    + "expired: {} -- over-quota: {} -- current-ledger: {}",
+                            name, ls.getLedgerId(), (clock.millis() - ls.getTimestamp()) / 1000.0, expired,
+                            overRetentionQuota, currentLedger.getId());
                 }
                 if (ls.getLedgerId() == currentLedger.getId()) {
-                    log.debug("[{}] ledger id skipped for deletion as it is currently being written to",
-                              name, ls.getLedgerId());
+                    log.debug("[{}] ledger id skipped for deletion as it is currently being written to", name,
+                            ls.getLedgerId());
                     break;
                 } else if (expired) {
                     log.debug("[{}] Ledger {} has expired, ts {}", name, ls.getLedgerId(), ls.getTimestamp());
@@ -1745,15 +1925,14 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
                     log.debug("[{}] Ledger {} is over quota", name, ls.getLedgerId());
                     ledgersToDelete.add(ls);
                 } else {
-                    log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota",
-                              name, ls.getLedgerId());
+                    log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
                     break;
                 }
             }
             for (LedgerInfo ls : ledgers.values()) {
                 if (isOffloadedNeedsDelete(ls.getOffloadContext()) && !ledgersToDelete.contains(ls)) {
-                    log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted",
-                              name, ls.getLedgerId());
+                    log.debug("[{}] Ledger {} has been offloaded, bookkeeper ledger needs to be deleted", name,
+                            ls.getLedgerId());
                     offloadedLedgersToDelete.add(ls);
                 }
             }
@@ -1783,16 +1962,12 @@ void internalTrimConsumedLedgers(CompletableFuture<?> promise) {
             }
             for (LedgerInfo ls : offloadedLedgersToDelete) {
                 LedgerInfo.Builder newInfoBuilder = ls.toBuilder();
-                newInfoBuilder.getOffloadContextBuilder()
-                    .setBookkeeperDeleted(true);
-                String driverName = OffloadUtils.getOffloadDriverName(
-                    ls, config.getLedgerOffloader().getOffloadDriverName());
-                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(
-                    ls, config.getLedgerOffloader().getOffloadDriverMetadata());
-                OffloadUtils.setOffloadDriverMetadata(
-                    newInfoBuilder,
-                    driverName, driverMetadata
-                );
+                newInfoBuilder.getOffloadContextBuilder().setBookkeeperDeleted(true);
+                String driverName = OffloadUtils.getOffloadDriverName(ls,
+                        config.getLedgerOffloader().getOffloadDriverName());
+                Map<String, String> driverMetadata = OffloadUtils.getOffloadDriverMetadata(ls,
+                        config.getLedgerOffloader().getOffloadDriverMetadata());
+                OffloadUtils.setOffloadDriverMetadata(newInfoBuilder, driverName, driverMetadata);
                 ledgers.put(ls.getLedgerId(), newInfoBuilder.build());
             }
 
@@ -1814,8 +1989,8 @@ public void operationComplete(Void result, Stat stat) {
                         asyncDeleteLedger(ls.getLedgerId(), ls);
                     }
                     for (LedgerInfo ls : offloadedLedgersToDelete) {
-                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}",
-                                 name, ls.getLedgerId(), ls.getSize());
+                        log.info("[{}] Deleting offloaded ledger {} from bookkeeper - size: {}", name, ls.getLedgerId(),
+                                ls.getSize());
                         asyncDeleteLedgerFromBookKeeper(ls.getLedgerId());
                     }
                     promise.complete(null);
@@ -1922,13 +2097,11 @@ private void asyncDeleteLedger(long ledgerId, LedgerInfo info) {
         }
 
         if (info.getOffloadContext().hasUidMsb()) {
-            UUID uuid = new UUID(info.getOffloadContext().getUidMsb(),
-                                 info.getOffloadContext().getUidLsb());
-            cleanupOffloaded(
-                ledgerId, uuid,
-                OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
-                OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
-                "Trimming");
+            UUID uuid = new UUID(info.getOffloadContext().getUidMsb(), info.getOffloadContext().getUidLsb());
+            cleanupOffloaded(ledgerId, uuid,
+                    OffloadUtils.getOffloadDriverName(info, config.getLedgerOffloader().getOffloadDriverName()),
+                    OffloadUtils.getOffloadDriverMetadata(info, config.getLedgerOffloader().getOffloadDriverMetadata()),
+                    "Trimming");
         }
     }
 
@@ -2017,16 +2190,16 @@ public Position offloadPrefix(Position pos) throws InterruptedException, Managed
         CompletableFuture<Position> promise = new CompletableFuture<>();
 
         asyncOffloadPrefix(pos, new OffloadCallback() {
-                @Override
-                public void offloadComplete(Position offloadedTo, Object ctx) {
-                    promise.complete(offloadedTo);
-                }
+            @Override
+            public void offloadComplete(Position offloadedTo, Object ctx) {
+                promise.complete(offloadedTo);
+            }
 
-                @Override
-                public void offloadFailed(ManagedLedgerException e, Object ctx) {
-                    promise.completeExceptionally(e);
-                }
-            }, null);
+            @Override
+            public void offloadFailed(ManagedLedgerException e, Object ctx) {
+                promise.completeExceptionally(e);
+            }
+        }, null);
 
         try {
             return promise.get(AsyncOperationTimeoutSeconds, TimeUnit.SECONDS);
@@ -2038,8 +2211,6 @@ public void offloadFailed(ManagedLedgerException e, Object ctx) {
         }
     }
 
-
-
     @Override
     public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ctx) {
         PositionImpl requestOffloadTo = (PositionImpl) pos;
@@ -2053,19 +2224,20 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
         Queue<LedgerInfo> ledgersToOffload = new ConcurrentLinkedQueue<>();
         synchronized (this) {
             log.info("[{}] Start ledgersOffload. ledgers={} totalSize={}", name, ledgers.keySet(),
-                     TOTAL_SIZE_UPDATER.get(this));
+                    TOTAL_SIZE_UPDATER.get(this));
 
             if (STATE_UPDATER.get(this) == State.Closed) {
                 log.info("[{}] Ignoring offload request since the managed ledger was already closed", name);
-                callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
-                                               "Can't offload closed managed ledger (" + name + ")"), ctx);
+                callback.offloadFailed(
+                        new ManagedLedgerAlreadyClosedException("Can't offload closed managed ledger (" + name + ")"),
+                        ctx);
                 return;
             }
 
             if (ledgers.isEmpty()) {
                 log.info("[{}] Tried to offload a managed ledger with no ledgers, giving up", name);
                 callback.offloadFailed(new ManagedLedgerAlreadyClosedException(
-                                               "Can't offload managed ledger (" + name + ") with no ledgers"), ctx);
+                        "Can't offload managed ledger (" + name + ") with no ledgers"), ctx);
                 return;
             }
 
@@ -2097,27 +2269,26 @@ public void asyncOffloadPrefix(Position pos, OffloadCallback callback, Object ct
 
         if (offloadMutex.tryLock()) {
             log.info("[{}] Going to offload ledgers {}", name,
-                     ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()));
+                    ledgersToOffload.stream().map(l -> l.getLedgerId()).collect(Collectors.toList()));
 
             CompletableFuture<PositionImpl> promise = new CompletableFuture<>();
             promise.whenComplete((result, exception) -> {
-                    offloadMutex.unlock();
-                    if (exception != null) {
-                        callback.offloadFailed(new ManagedLedgerException(exception), ctx);
-                    } else {
-                        callback.offloadComplete(result, ctx);
-                    }
-                });
+                offloadMutex.unlock();
+                if (exception != null) {
+                    callback.offloadFailed(new ManagedLedgerException(exception), ctx);
+                } else {
+                    callback.offloadComplete(result, ctx);
+                }
+            });
             offloadLoop(promise, ledgersToOffload, firstUnoffloaded, Optional.empty());
         } else {
             callback.offloadFailed(
-                    new ManagedLedgerException.OffloadInProgressException("Offload operation already running"),
-                    ctx);
+                    new ManagedLedgerException.OffloadInProgressException("Offload operation already running"), ctx);
         }
     }
 
     private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
-                             PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+            PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
         LedgerInfo info = ledgersToOffload.poll();
         if (info == null) {
             if (firstError.isPresent()) {
@@ -2201,29 +2372,28 @@ private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerIn
     }
 
     private void tryTransformLedgerInfo(long ledgerId, LedgerInfoTransformation transformation,
-                                        CompletableFuture<Void> finalPromise) {
+            CompletableFuture<Void> finalPromise) {
         synchronized (this) {
             if (!ledgersListMutex.tryLock()) {
                 // retry in 100 milliseconds
-                scheduledExecutor.schedule(safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation,
-                                                                                finalPromise)),
-                                           100, TimeUnit.MILLISECONDS);
+                scheduledExecutor.schedule(
+                        safeRun(() -> tryTransformLedgerInfo(ledgerId, transformation, finalPromise)), 100,
+                        TimeUnit.MILLISECONDS);
             } else { // lock acquired
                 CompletableFuture<Void> unlockingPromise = new CompletableFuture<>();
                 unlockingPromise.whenComplete((res, ex) -> {
-                        ledgersListMutex.unlock();
-                        if (ex != null) {
-                            finalPromise.completeExceptionally(ex);
-                        } else {
-                            finalPromise.complete(res);
-                        }
-                    });
+                    ledgersListMutex.unlock();
+                    if (ex != null) {
+                        finalPromise.completeExceptionally(ex);
+                    } else {
+                        finalPromise.complete(res);
+                    }
+                });
 
                 LedgerInfo oldInfo = ledgers.get(ledgerId);
                 if (oldInfo == null) {
-                    unlockingPromise.completeExceptionally(
-                            new OffloadConflict(
-                                    "Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed"));
+                    unlockingPromise.completeExceptionally(new OffloadConflict(
+                            "Ledger " + ledgerId + " no longer exists in ManagedLedger, likely trimmed"));
                 } else {
                     try {
                         LedgerInfo newInfo = transformation.transform(oldInfo);
@@ -2249,10 +2419,8 @@ public void operationFailed(MetaStoreException e) {
         }
     }
 
-    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId,
-                                                                  UUID uuid,
-                                                                  String offloadDriverName,
-                                                                  Map<String, String> offloadDriverMetadata) {
+    private CompletableFuture<Void> prepareLedgerInfoForOffloaded(long ledgerId, UUID uuid, String offloadDriverName,
+            Map<String, String> offloadDriverMetadata) {
         log.info("[{}] Preparing metadata to offload ledger {} with uuid {}", name, ledgerId, uuid);
         return transformLedgerInfo(ledgerId,
                                    (oldInfo) -> {
@@ -2330,24 +2498,22 @@ public void operationFailed(MetaStoreException e) {
                 });
     }
 
-    private void cleanupOffloaded(long ledgerId,
-                                  UUID uuid,
-                                  String offloadDriverName, /* TODO: use driver name to identify offloader */
-                                  Map<String, String> offloadDriverMetadata,
-                                  String cleanupReason) {
+    private void cleanupOffloaded(long ledgerId, UUID uuid, String offloadDriverName, /*
+                                                                                       * TODO: use driver name to
+                                                                                       * identify offloader
+                                                                                       */
+            Map<String, String> offloadDriverMetadata, String cleanupReason) {
         Retries.run(Backoff.exponentialJittered(TimeUnit.SECONDS.toMillis(1), TimeUnit.SECONDS.toHours(1)).limit(10),
-                    Retries.NonFatalPredicate,
-                    () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata),
-                    scheduledExecutor, name)
-            .whenComplete((ignored, exception) -> {
+                Retries.NonFatalPredicate,
+                () -> config.getLedgerOffloader().deleteOffloaded(ledgerId, uuid, offloadDriverMetadata),
+                scheduledExecutor, name).whenComplete((ignored, exception) -> {
                     if (exception != null) {
-                        log.warn("Error cleaning up offload for {}, (cleanup reason: {})",
-                                 ledgerId, cleanupReason, exception);
+                        log.warn("Error cleaning up offload for {}, (cleanup reason: {})", ledgerId, cleanupReason,
+                                exception);
                     }
                 });
     }
 
-
     /**
      * Get the number of entries between a contiguous range of two positions.
      *
@@ -2599,8 +2765,10 @@ PositionImpl getMarkDeletePositionOfSlowestConsumer() {
         do {
             pos = getFirstPosition();
             lastPositionAndCounter = getLastPositionAndCounter();
-            count = lastPositionAndCounter.getRight() - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft()));
-        } while (pos.compareTo(getFirstPosition()) != 0 || lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0);
+            count = lastPositionAndCounter.getRight()
+                    - getNumberOfEntries(Range.openClosed(pos, lastPositionAndCounter.getLeft()));
+        } while (pos.compareTo(getFirstPosition()) != 0
+                || lastPositionAndCounter.getLeft().compareTo(getLastPosition()) != 0);
         return Pair.of(pos, count);
     }
 
@@ -2643,9 +2811,8 @@ private boolean currentLedgerIsFull() {
 
                 boolean switchLedger = timeSinceLedgerCreationMs > config.getMinimumRolloverTimeMs();
                 if (log.isDebugEnabled()) {
-                    log.debug("Diff: {}, threshold: {} -- switch: {}",
-                            clock.millis() - lastLedgerCreatedTimestamp, config.getMinimumRolloverTimeMs(),
-                            switchLedger);
+                    log.debug("Diff: {}, threshold: {} -- switch: {}", clock.millis() - lastLedgerCreatedTimestamp,
+                            config.getMinimumRolloverTimeMs(), switchLedger);
                 }
                 return switchLedger;
             } else {
@@ -2802,7 +2969,7 @@ public static ManagedLedgerException createManagedLedgerException(int bkErrorCod
 
     public static ManagedLedgerException createManagedLedgerException(Throwable t) {
         if (t instanceof org.apache.bookkeeper.client.api.BKException) {
-            return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException)t).getCode());
+            return createManagedLedgerException(((org.apache.bookkeeper.client.api.BKException) t).getCode());
         } else {
             return new ManagedLedgerException("Unknown exception");
         }
@@ -2853,7 +3020,7 @@ protected boolean checkAndCompleteLedgerOpTask(int rc, LedgerHandle lh, Object c
         }
         return false;
     }
-    
+
     private static final Logger log = LoggerFactory.getLogger(ManagedLedgerImpl.class);
 
 }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
index eaf8dbc95a..b02b0d8479 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java
@@ -24,6 +24,8 @@
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedLedgerException;
@@ -32,6 +34,8 @@
 import org.apache.bookkeeper.mledger.Position;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.TRUE;
+import static org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.FALSE;
 
 class OpReadEntry implements ReadEntriesCallback {
 
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
index 4f630ee116..36376b8379 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ReadOnlyManagedLedgerImpl.java
@@ -144,7 +144,7 @@ private ReadOnlyCursor createReadOnlyCursor(PositionImpl startPosition) {
     @Override
     void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
             this.getLedgerHandle(position.getLedgerId()).thenAccept((ledger) -> {
-                this.entryCache.asyncReadEntry(ledger, position, callback, ctx);
+                asyncReadEntry(ledger, position, callback, ctx);
             }).exceptionally((ex) -> {
                 log.error("[{}] Error opening ledger for reading at position {} - {}", new Object[]{this.name, position, ex.getMessage()});
                 callback.readEntryFailed(ManagedLedgerException.getManagedLedgerException(ex.getCause()), ctx);
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index aa85d8027b..a16dcd2c4c 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -21,6 +21,7 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyInt;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
@@ -54,6 +55,8 @@
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
+import org.apache.bookkeeper.client.api.LedgerEntries;
+import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback;
@@ -61,6 +64,7 @@
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
 import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
+import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntryCallback;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
@@ -2248,4 +2252,76 @@ public void createComplete(int rc, LedgerHandle lh, Object ctx) {
         
         ledger.close();
     }
+    
+    /**
+     * It verifies that asyncRead timesout if it doesn't receive response from bk-client in configured timeout
+     * 
+     * @throws Exception
+     */
+    @Test
+    public void testManagedLedgerWithReadEntryTimeOut() throws Exception {
+        ManagedLedgerConfig config = new ManagedLedgerConfig().setReadEntryTimeoutSeconds(1);
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("timeout_ledger_test", config);
+
+        BookKeeper bk = mock(BookKeeper.class);
+        doNothing().when(bk).asyncCreateLedger(anyInt(), anyInt(), anyInt(), any(), any(), any(), any(), any());
+        AtomicReference<ManagedLedgerException> responseException1 = new AtomicReference<>();
+        CountDownLatch latch1 = new CountDownLatch(1);
+
+        CompletableFuture<LedgerEntries> entriesFuture = new CompletableFuture<>();
+        ReadHandle ledgerHandle = mock(ReadHandle.class);
+        doReturn(entriesFuture).when(ledgerHandle).readAsync(PositionImpl.earliest.getLedgerId(),
+                PositionImpl.earliest.getEntryId());
+
+        // (1) test read-timeout for: ManagedLedger.asyncReadEntry(..)
+        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest, new ReadEntryCallback() {
+            @Override
+            public void readEntryComplete(Entry entry, Object ctx) {
+                responseException1.set(null);
+                latch1.countDown();
+            }
+
+            @Override
+            public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                responseException1.set(exception);
+                latch1.countDown();
+            }
+        }, null);
+        ledger.asyncCreateLedger(bk, config, null, new CreateCallback() {
+            @Override
+            public void createComplete(int rc, LedgerHandle lh, Object ctx) {
+
+            }
+        }, Collections.emptyMap());
+        latch1.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+        assertNotNull(responseException1.get());
+        assertEquals(responseException1.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
+
+        // (2) test read-timeout for: ManagedLedger.asyncReadEntry(..)
+        CountDownLatch latch2 = new CountDownLatch(1);
+        AtomicReference<ManagedLedgerException> responseException2 = new AtomicReference<>();
+        PositionImpl readPositionRef = PositionImpl.earliest;
+        ManagedCursorImpl cursor = new ManagedCursorImpl(bk, config, ledger, "cursor1");
+        OpReadEntry opReadEntry = OpReadEntry.create(cursor, readPositionRef, 1, new ReadEntriesCallback() {
+
+            @Override
+            public void readEntriesComplete(List<Entry> entries, Object ctx) {
+                latch2.countDown();
+            }
+
+            @Override
+            public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
+                responseException2.set(exception);
+                latch2.countDown();
+            }
+
+        }, null);
+        ledger.asyncReadEntry(ledgerHandle, PositionImpl.earliest.getEntryId(), PositionImpl.earliest.getEntryId(),
+                false, opReadEntry, null);
+        latch2.await(config.getReadEntryTimeoutSeconds() + 2, TimeUnit.SECONDS);
+        assertNotNull(responseException2.get());
+        assertEquals(responseException2.get().getMessage(), BKException.getMessage(BKException.Code.TimeoutException));
+
+        ledger.close();
+    }
 }
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index cee095a087..96259870dd 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -757,6 +757,14 @@
         doc = "operation timeout while updating managed-ledger metadata."
     )
     private long managedLedgerMetadataOperationsTimeoutSeconds = 60;
+    @FieldContext(
+            category = CATEGORY_STORAGE_ML,
+            doc = "Read entries timeout when broker tries to read messages from bookkeeper "
+                    + "(disable timeout by setting readTimeoutSeconds <= 0)"
+        )
+    private long managedLedgerReadEntryTimeoutSeconds = 60;
+        
+    
 
     /*** --- Load balancer --- ****/
     @FieldContext(
@@ -1134,4 +1142,4 @@ public int getBookkeeperHealthCheckIntervalSec() {
     public Optional<Integer> getWebServicePortTls() {
         return Optional.ofNullable(webServicePortTls);
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index a43e404dba..b2b5103a9c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -744,6 +744,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
 
             managedLedgerConfig.setMetadataOperationsTimeoutSeconds(
                     serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds());
+            managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds());
             managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize());
             managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum());
             managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum());
diff --git a/site/_data/config/broker.yaml b/site/_data/config/broker.yaml
index 31285be5dc..bbb3383ff9 100644
--- a/site/_data/config/broker.yaml
+++ b/site/_data/config/broker.yaml
@@ -239,6 +239,12 @@ configs:
 - name: autoSkipNonRecoverableData
   default: 'false'
   description: Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger.
+- name: managedLedgerMetadataOperationsTimeoutSeconds
+  default: '60'
+  description: Operation timeout while updating managed-ledger metadata.
+- name: managedLedgerReadEntryTimeoutSeconds
+  default: '120'
+  description: Read entries timeout when broker tries to read messages from bookkeeper.
 - name: loadBalancerEnabled
   default: 'true'
   description: Enable load balancer
diff --git a/site/_data/config/standalone.yaml b/site/_data/config/standalone.yaml
index d6f4c59bcb..173d1ddeef 100644
--- a/site/_data/config/standalone.yaml
+++ b/site/_data/config/standalone.yaml
@@ -163,6 +163,10 @@ configs:
   default: '14400'
 - name: autoSkipNonRecoverableData
   default: 'false'
+- name: managedLedgerMetadataOperationsTimeoutSeconds
+  default: '60'
+- name: managedLedgerReadEntryTimeoutSeconds
+  default: '120'
 - name: loadBalancerEnabled
   default: 'false'
 - name: loadBalancerPlacementStrategy


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services