You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by te...@apache.org on 2022/08/15 05:26:27 UTC

[pulsar] 01/02: [fix][tiered-storage] move the state check forward (#17020)

This is an automated email from the ASF dual-hosted git repository.

technoboy pushed a commit to branch branch-2.11
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit e36dfbfdeba336172e5e74b14329231d823a36ae
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 21:01:55 2022 +0800

    [fix][tiered-storage] move the state check forward (#17020)
    
    * [fix][tiered-storage] move the state check forward
    ---
    
    *Motivation*
    
    Move the close check forward to avoid `getLastAddConfirmed()` get
    an NPE.
    If the state is closed. That means the resource is closed and the
    `OffloadIndexBlock` has been recycled. Which will cause an NPE when
    `getLastAddCOnfirmed()`.
---
 .../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 12 ++++++-----
 .../impl/BlobStoreManagedLedgerOffloaderTest.java  | 25 +++++++++++++++++++++-
 2 files changed, 31 insertions(+), 6 deletions(-)

diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 38ba38a8e65..ab64388cd4d 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -120,6 +120,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
             List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
             boolean seeked = false;
             try {
+                if (state == State.Closed) {
+                    log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                    throw new BKException.BKUnexpectedConditionException();
+                }
                 if (firstEntry > lastEntry
                     || firstEntry < 0
                     || lastEntry > getLastAddConfirmed()) {
@@ -138,11 +143,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
                 }
 
                 while (entriesToRead > 0) {
-                    if (state == State.Closed) {
-                        log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
-                                ledgerId, firstEntry, lastEntry);
-                        throw new BKException.BKUnexpectedConditionException();
-                    }
                     long currentPosition = inputStream.getCurrentPosition();
                     int length = dataStream.readInt();
                     if (length < 0) { // hit padding or new block
@@ -189,6 +189,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
 
                 promise.complete(LedgerEntriesImpl.create(entries));
             } catch (Throwable t) {
+                log.error("Failed to read entries {} - {} from the offloader in ledger {}",
+                    firstEntry, lastEntry, ledgerId, t);
                 promise.completeExceptionally(t);
                 entries.forEach(LedgerEntry::close);
             }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 74d03b11dba..e6b0cc156ad 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -566,4 +566,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
         OffloadedLedgerMetadata offloadedLedgerMetadata2 = result.get(1);
         assertEquals(toWrite.getId(), offloadedLedgerMetadata2.getLedgerId());
     }
-}
\ No newline at end of file
+
+    @Test
+    public void testReadWithAClosedLedgerHandler() throws Exception {
+        ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+        LedgerOffloader offloader = getOffloader();
+        UUID uuid = UUID.randomUUID();
+        offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+        ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+        Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+        long lac = toTest.getLastAddConfirmed();
+        toTest.readAsync(0, lac).get();
+        toTest.closeAsync().get();
+        try {
+            toTest.readAsync(0, lac).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+                // expected exception
+                return;
+            }
+            throw e;
+        }
+    }
+}