You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by ch...@apache.org on 2022/08/16 07:27:11 UTC
[pulsar] 01/03: [fix][tiered-storage] move the state check forward (#17020)
This is an automated email from the ASF dual-hosted git repository.
chenhang pushed a commit to branch branch-2.9
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 06be308844948596907402eecba6598df6eceb18
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Thu Aug 11 21:01:55 2022 +0800
[fix][tiered-storage] move the state check forward (#17020)
* [fix][tiered-storage] move the state check forward
---
*Motivation*
Move the close check forward to avoid `getLastAddConfirmed()` get
an NPE.
If the state is closed. That means the resource is closed and the
`OffloadIndexBlock` has been recycled. Which will cause an NPE when
`getLastAddCOnfirmed()`.
(cherry picked from commit ee0ea3a6f9ffb42d4ec129eb689d3c1059e5f4a8)
---
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 11 +++++++----
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 23 ++++++++++++++++++++++
2 files changed, 30 insertions(+), 4 deletions(-)
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index 73a4dd76e53..84d28692377 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -107,6 +107,11 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
boolean seeked = false;
try {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ throw new BKException.BKUnexpectedConditionException();
+ }
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
@@ -125,10 +130,6 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
}
while (entriesToRead > 0) {
- if (state == State.Closed) {
- log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
- throw new BKException.BKUnexpectedConditionException();
- }
int length = dataStream.readInt();
if (length < 0) { // hit padding or new block
inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
@@ -173,6 +174,8 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
promise.complete(LedgerEntriesImpl.create(entries));
} catch (Throwable t) {
+ log.error("Failed to read entries {} - {} from the offloader in ledger {}",
+ firstEntry, lastEntry, ledgerId, t);
promise.completeExceptionally(t);
entries.forEach(LedgerEntry::close);
}
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index 77dfc55b777..ab979f8a5a1 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -496,4 +496,27 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
fail("Get unexpected exception when reading entries", e);
}
}
+
+ @Test
+ public void testReadWithAClosedLedgerHandler() throws Exception {
+ ReadHandle toWrite = buildReadHandle(DEFAULT_BLOCK_SIZE, 1);
+ LedgerOffloader offloader = getOffloader();
+ UUID uuid = UUID.randomUUID();
+ offloader.offload(toWrite, uuid, new HashMap<>()).get();
+
+ ReadHandle toTest = offloader.readOffloaded(toWrite.getId(), uuid, Collections.emptyMap()).get();
+ Assert.assertEquals(toTest.getLastAddConfirmed(), toWrite.getLastAddConfirmed());
+ long lac = toTest.getLastAddConfirmed();
+ toTest.readAsync(0, lac).get();
+ toTest.closeAsync().get();
+ try {
+ toTest.readAsync(0, lac).get();
+ } catch (Exception e) {
+ if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+ // expected exception
+ return;
+ }
+ throw e;
+ }
+ }
}