You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/15 05:26:26 UTC

[pulsar] branch branch-2.11 updated (1df433af8ee -> 57b9824a0d3)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a change to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git


    from 1df433af8ee [improve][broker] Improve naming for delete topic error (#16965)
     new e36dfbfdeba [fix][tiered-storage] move the state check forward (#17020)
     new 57b9824a0d3 [fix][ML] Fix offload read handle NPE (#17056)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../bookkeeper/mledger/ManagedLedgerException.java |  7 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 26 ++++++++++
 .../mledger/impl/NonDurableCursorTest.java         | 59 ++++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     |  3 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 +-
 .../streamingdispatch/StreamingEntryReader.java    |  3 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 15 ++++--
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 28 +++++++---
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 26 +++++++++-
 10 files changed, 176 insertions(+), 20 deletions(-)


[pulsar] 01/02: [fix][tiered-storage] move the state check forward (#17020)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e36dfbfdeba336172e5e74b14329231d823a36ae
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 21:01:55 2022 +0800

    [fix][tiered-storage] move the state check forward (#17020)
    
    * [fix][tiered-storage] move the state check forward
    ---
    
    *Motivation*
    
    Move the close check forward to avoid `getLastAddConfirmed()` get
    an NPE.
    If the state is closed. That means the resource is closed and the
    `OffloadIndexBlock` has been recycled. Which will cause an NPE when
    `getLastAddCOnfirmed()`.
---
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 12 ++++++-----
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 25 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 38ba38a8e65..ab64388cd4d 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -120,6 +120,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
+                if (state == State.Closed) {
+                    log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                    throw new BKException.BKUnexpectedConditionException();
+                }
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
@@ -138,11 +143,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 }
 
                 while (entriesToRead > 0) {
-                    if (state == State.Closed) {
-                        log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
-                                ledgerId, firstEntry, lastEntry);
-                        throw new BKException.BKUnexpectedConditionException();
-                    }
                     long currentPosition = inputStream.getCurrentPosition();
                     int length = dataStream.readInt();
                     if (length < 0) { // hit padding or new block
@@ -189,6 +189,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
 
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
+                log.error("Failed to read entries {} - {} from the offloader in ledger {}",
+                    firstEntry, lastEntry, ledgerId, t);
                 promise.completeExceptionally(t);
                 entries.forEach(LedgerEntry::close);
             }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 74d03b11dba..e6b0cc156ad 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -566,4 +566,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         OffloadedLedgerMetadata offloadedLedgerMetadata2 = result.get(1);
         assertEquals(toWrite.getId(), offloadedLedgerMetadata2.getLedgerId());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testReadWithAClosedLedgerHandler() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        long lac = toTest.getLastAddConfirmed();
+        toTest.readAsync(0, lac).get();
+        toTest.closeAsync().get();
+        try {
+            toTest.readAsync(0, lac).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+                // expected exception
+                return;
+            }
+            throw e;
+        }
+    }
+}


[pulsar] 02/02: [fix][ML] Fix offload read handle NPE (#17056)

Posted by te...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 57b9824a0d3afe4a10bbcada91a3903cce1f2f57
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Sun Aug 14 07:26:30 2022 +0800

    [fix][ML] Fix offload read handle NPE (#17056)
---
 .../bookkeeper/mledger/ManagedLedgerException.java |  7 +++
 .../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++--
 .../bookkeeper/mledger/impl/ManagedLedgerTest.java | 26 ++++++++++
 .../mledger/impl/NonDurableCursorTest.java         | 59 ++++++++++++++++++++++
 .../PersistentDispatcherMultipleConsumers.java     |  3 +-
 .../PersistentDispatcherSingleActiveConsumer.java  |  3 +-
 .../streamingdispatch/StreamingEntryReader.java    |  3 +-
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 13 +++--
 .../impl/BlobStoreBackedReadHandleImplV2.java      | 28 +++++++---
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  |  3 +-
 10 files changed, 151 insertions(+), 20 deletions(-)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 7046ba48193..347a380d7eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -189,6 +189,13 @@ public class ManagedLedgerException extends Exception {
         }
     }
 
+    public static class OffloadReadHandleClosedException extends ManagedLedgerException {
+
+        public OffloadReadHandleClosedException() {
+            super("Offload read handle already closed");
+        }
+    }
+
     @Override
     public synchronized Throwable fillInStackTrace() {
         // Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 29c78dd7bbb..a2282ee0f92 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2509,7 +2509,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     break;
                 }
                 // if truncate, all ledgers besides currentLedger are going to be deleted
-                if (isTruncate){
+                if (isTruncate) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] Ledger {} will be truncated with ts {}",
                                 name, ls.getLedgerId(), ls.getTimestamp());
@@ -2537,11 +2537,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
                     }
                     ledgersToDelete.add(ls);
                 } else {
-                    // once retention constraint has been met, skip check
-                    if (log.isDebugEnabled()) {
-                        log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
+                    if (ls.getLedgerId() < getTheSlowestNonDurationReadPosition().getLedgerId()) {
+                        // once retention constraint has been met, skip check
+                        if (log.isDebugEnabled()) {
+                            log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name,
+                                    ls.getLedgerId());
+                        }
+                        invalidateReadHandle(ls.getLedgerId());
                     }
-                    invalidateReadHandle(ls.getLedgerId());
                 }
             }
 
@@ -4207,4 +4210,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
             }
         }
     }
+
+    public Position getTheSlowestNonDurationReadPosition() {
+        PositionImpl theSlowestNonDurableReadPosition = PositionImpl.LATEST;
+        for (ManagedCursor cursor : cursors) {
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+                if (readPosition.compareTo(theSlowestNonDurableReadPosition) < 0) {
+                    theSlowestNonDurableReadPosition = readPosition;
+                }
+            }
+        }
+        return theSlowestNonDurableReadPosition;
+    }
 }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index d556ff8eb63..3293c98c5c6 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.bookkeeper.mledger.impl;
 
+import static java.nio.charset.StandardCharsets.UTF_8;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.anyInt;
 import static org.mockito.ArgumentMatchers.anyString;
@@ -3840,6 +3841,31 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
         });
     }
 
+    @Test
+    public void testGetTheSlowestNonDurationReadPosition() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test_",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ledger.openCursor("c1");
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST);
+
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), positions.get(0));
+
+        ledger.deleteCursor(nonDurableCursor.getName());
+
+        Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST);
+
+        ledger.close();
+    }
+
     @Test
     public void testGetLedgerMetadata() throws Exception {
         ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetLedgerMetadata");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index 29ba99687ba..f0d71f2ed18 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class NonDurableCursorTest extends MockedBookKeeperTestCase {
@@ -737,6 +738,64 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+        ledger.close();
+    }
+
     @Test(expectedExceptions = NullPointerException.class)
     void testCursorWithNameIsNotNull() throws Exception {
         final String p1CursorName = "entry-1";
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index ea3b9bbe540..7ed277ddfda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -792,7 +792,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 022a86ce9a0..608b0fa503f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -497,7 +497,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
                 // Notify the consumer only if all the messages were already acknowledged
                 consumers.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", name,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index ec46d32548f..3f6f82200b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -197,7 +197,8 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
         PositionImpl readPosition = pendingReadEntryRequest.position;
         pendingReadEntryRequest.retry++;
         long waitTimeMillis = readFailureBackoff.next();
-        if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
             waitTimeMillis = 1;
             if (log.isDebugEnabled()) {
                 log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds",
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index ab64388cd4d..499084ab9cd 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -117,14 +118,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
         }
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
-                if (state == State.Closed) {
-                    log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
-                        ledgerId, firstEntry, lastEntry);
-                    throw new BKException.BKUnexpectedConditionException();
-                }
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index b896f38d390..495a6e2fcb3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
 import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
 import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
@@ -58,6 +59,12 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     private final List<BackedInputStream> inputStreams;
     private final List<DataInputStream> dataStreams;
     private final ExecutorService executor;
+    private State state = null;
+
+    enum State {
+        Opened,
+        Closed
+    }
 
     static class GroupedReader {
         @Override
@@ -99,6 +106,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
             dataStreams.add(new DataInputStream(inputStream));
         }
         this.executor = executor;
+        this.state = State.Opened;
     }
 
     @Override
@@ -123,6 +131,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
                 for (DataInputStream dataStream : dataStreams) {
                     dataStream.close();
                 }
+                state = State.Closed;
                 promise.complete(null);
             } catch (IOException t) {
                 promise.completeExceptionally(t);
@@ -135,13 +144,20 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        if (firstEntry > lastEntry
-                || firstEntry < 0
-                || lastEntry > getLastAddConfirmed()) {
-            promise.completeExceptionally(new IllegalArgumentException());
-            return promise;
-        }
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
+            if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+                return;
+            }
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             List<GroupedReader> groupedReaders = null;
             try {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index e6b0cc156ad..6f499d153e2 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
 import org.apache.bookkeeper.client.api.ReadHandle;
 import org.apache.bookkeeper.mledger.LedgerOffloader;
 import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
 import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
 import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
 import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
@@ -582,7 +583,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         try {
             toTest.readAsync(0, lac).get();
         } catch (Exception e) {
-            if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+            if (e.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
                 // expected exception
                 return;
             }