You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/13 23:26:37 UTC
[pulsar] branch master updated: [fix][ML] Fix offload read handle NPE (#17056)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6fc48d1186d [fix][ML] Fix offload read handle NPE (#17056)
6fc48d1186d is described below
commit 6fc48d1186d0812d0e88d9e40515c147e8235555
Author: Yan Zhao <ho...@apache.org>
AuthorDate: Sun Aug 14 07:26:30 2022 +0800
[fix][ML] Fix offload read handle NPE (#17056)
---
.../bookkeeper/mledger/ManagedLedgerException.java | 7 +++
.../bookkeeper/mledger/impl/ManagedLedgerImpl.java | 26 ++++++++--
.../bookkeeper/mledger/impl/ManagedLedgerTest.java | 26 ++++++++++
.../mledger/impl/NonDurableCursorTest.java | 59 ++++++++++++++++++++++
.../PersistentDispatcherMultipleConsumers.java | 3 +-
.../PersistentDispatcherSingleActiveConsumer.java | 3 +-
.../streamingdispatch/StreamingEntryReader.java | 3 +-
.../jcloud/impl/BlobStoreBackedReadHandleImpl.java | 13 +++--
.../impl/BlobStoreBackedReadHandleImplV2.java | 28 +++++++---
.../impl/BlobStoreManagedLedgerOffloaderTest.java | 3 +-
10 files changed, 151 insertions(+), 20 deletions(-)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
index 7046ba48193..347a380d7eb 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerException.java
@@ -189,6 +189,13 @@ public class ManagedLedgerException extends Exception {
}
}
+ public static class OffloadReadHandleClosedException extends ManagedLedgerException {
+
+ public OffloadReadHandleClosedException() {
+ super("Offload read handle already closed");
+ }
+ }
+
@Override
public synchronized Throwable fillInStackTrace() {
// Disable stack traces to be filled in
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 8b942b7baa8..aa7c19b32bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2507,7 +2507,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
break;
}
// if truncate, all ledgers besides currentLedger are going to be deleted
- if (isTruncate){
+ if (isTruncate) {
if (log.isDebugEnabled()) {
log.debug("[{}] Ledger {} will be truncated with ts {}",
name, ls.getLedgerId(), ls.getTimestamp());
@@ -2535,11 +2535,14 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
ledgersToDelete.add(ls);
} else {
- // once retention constraint has been met, skip check
- if (log.isDebugEnabled()) {
- log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name, ls.getLedgerId());
+ if (ls.getLedgerId() < getTheSlowestNonDurationReadPosition().getLedgerId()) {
+ // once retention constraint has been met, skip check
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Ledger {} not deleted. Neither expired nor over-quota", name,
+ ls.getLedgerId());
+ }
+ invalidateReadHandle(ls.getLedgerId());
}
- invalidateReadHandle(ls.getLedgerId());
}
}
@@ -4205,4 +4208,17 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
}
}
}
+
+ public Position getTheSlowestNonDurationReadPosition() {
+ PositionImpl theSlowestNonDurableReadPosition = PositionImpl.LATEST;
+ for (ManagedCursor cursor : cursors) {
+ if (cursor instanceof NonDurableCursorImpl) {
+ PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+ if (readPosition.compareTo(theSlowestNonDurableReadPosition) < 0) {
+ theSlowestNonDurableReadPosition = readPosition;
+ }
+ }
+ }
+ return theSlowestNonDurableReadPosition;
+ }
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 1eca3cef184..37fa56cd989 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;
+import static java.nio.charset.StandardCharsets.UTF_8;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
@@ -3841,6 +3842,31 @@ public class ManagedLedgerTest extends MockedBookKeeperTestCase {
});
}
+ @Test
+ public void testGetTheSlowestNonDurationReadPosition() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("test_",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+ .setRetentionSizeInMB(-1));
+ ledger.openCursor("c1");
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+ }
+
+ Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST);
+
+ ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+ Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), positions.get(0));
+
+ ledger.deleteCursor(nonDurableCursor.getName());
+
+ Assert.assertEquals(ledger.getTheSlowestNonDurationReadPosition(), PositionImpl.LATEST);
+
+ ledger.close();
+ }
+
@Test
public void testGetLedgerMetadata() throws Exception {
ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) factory.open("testGetLedgerMetadata");
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
index f9e524a636f..f73ebf122b7 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java
@@ -54,6 +54,7 @@ import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.Test;
public class NonDurableCursorTest extends MockedBookKeeperTestCase {
@@ -737,6 +738,64 @@ public class NonDurableCursorTest extends MockedBookKeeperTestCase {
ledger.close();
}
+ @Test
+ public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+ ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+ new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+ .setRetentionSizeInMB(-1));
+ ManagedCursor c1 = ledger.openCursor("c1");
+ ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+ List<Position> positions = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+ }
+
+ CountDownLatch latch = new CountDownLatch(10);
+ for (int i = 0; i < 10; i++) {
+ ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+ @Override
+ public void readEntryComplete(Entry entry, Object ctx) {
+ latch.countDown();
+ }
+
+ @Override
+ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+ latch.countDown();
+ }
+ }, null);
+ }
+
+ latch.await();
+
+ c1.markDelete(positions.get(4));
+
+ CompletableFuture<Void> promise = new CompletableFuture<>();
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+ promise = new CompletableFuture<>();
+
+ nonDurableCursor.markDelete(positions.get(3));
+
+ ledger.internalTrimConsumedLedgers(promise);
+ promise.join();
+
+ Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(0).getLedgerId()));
+ Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(1).getLedgerId()));
+ Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(2).getLedgerId()));
+ Assert.assertFalse(ledger.ledgerCache.containsKey(positions.get(3).getLedgerId()));
+ Assert.assertTrue(ledger.ledgerCache.containsKey(positions.get(4).getLedgerId()));
+
+ ledger.close();
+ }
+
@Test(expectedExceptions = NullPointerException.class)
void testCursorWithNameIsNotNull() throws Exception {
final String p1CursorName = "entry-1";
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index ea3b9bbe540..7ed277ddfda 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -792,7 +792,8 @@ public class PersistentDispatcherMultipleConsumers extends AbstractDispatcherMul
// Notify the consumer only if all the messages were already acknowledged
consumerList.forEach(Consumer::reachedEndOfTopic);
}
- } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+ } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+ || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
waitTimeMillis = 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Error reading transaction entries : {}, Read Type {} - Retrying to read in {} seconds",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 022a86ce9a0..608b0fa503f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -497,7 +497,8 @@ public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcher
// Notify the consumer only if all the messages were already acknowledged
consumers.forEach(Consumer::reachedEndOfTopic);
}
- } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+ } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+ || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
waitTimeMillis = 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", name,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
index ec46d32548f..3f6f82200b0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/streamingdispatch/StreamingEntryReader.java
@@ -197,7 +197,8 @@ public class StreamingEntryReader implements AsyncCallbacks.ReadEntryCallback, W
PositionImpl readPosition = pendingReadEntryRequest.position;
pendingReadEntryRequest.retry++;
long waitTimeMillis = readFailureBackoff.next();
- if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+ if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+ || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
waitTimeMillis = 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds",
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
index ab64388cd4d..499084ab9cd 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImpl.java
@@ -39,6 +39,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlock;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockBuilder;
@@ -117,14 +118,16 @@ public class BlobStoreBackedReadHandleImpl implements ReadHandle {
}
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.execute(() -> {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+ return;
+ }
+
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
boolean seeked = false;
try {
- if (state == State.Closed) {
- log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
- ledgerId, firstEntry, lastEntry);
- throw new BKException.BKUnexpectedConditionException();
- }
if (firstEntry > lastEntry
|| firstEntry < 0
|| lastEntry > getLastAddConfirmed()) {
diff --git a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
index b896f38d390..495a6e2fcb3 100644
--- a/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
+++ b/tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java
@@ -40,6 +40,7 @@ import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntriesImpl;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.offload.jcloud.BackedInputStream;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2;
import org.apache.bookkeeper.mledger.offload.jcloud.OffloadIndexBlockV2Builder;
@@ -58,6 +59,12 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
private final List<BackedInputStream> inputStreams;
private final List<DataInputStream> dataStreams;
private final ExecutorService executor;
+ private State state = null;
+
+ enum State {
+ Opened,
+ Closed
+ }
static class GroupedReader {
@Override
@@ -99,6 +106,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
dataStreams.add(new DataInputStream(inputStream));
}
this.executor = executor;
+ this.state = State.Opened;
}
@Override
@@ -123,6 +131,7 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
for (DataInputStream dataStream : dataStreams) {
dataStream.close();
}
+ state = State.Closed;
promise.complete(null);
} catch (IOException t) {
promise.completeExceptionally(t);
@@ -135,13 +144,20 @@ public class BlobStoreBackedReadHandleImplV2 implements ReadHandle {
public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
- if (firstEntry > lastEntry
- || firstEntry < 0
- || lastEntry > getLastAddConfirmed()) {
- promise.completeExceptionally(new IllegalArgumentException());
- return promise;
- }
executor.execute(() -> {
+ if (state == State.Closed) {
+ log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+ ledgerId, firstEntry, lastEntry);
+ promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+ return;
+ }
+
+ if (firstEntry > lastEntry
+ || firstEntry < 0
+ || lastEntry > getLastAddConfirmed()) {
+ promise.completeExceptionally(new BKException.BKIncorrectParameterException());
+ return;
+ }
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
List<GroupedReader> groupedReaders = null;
try {
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
index e6b0cc156ad..6f499d153e2 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreManagedLedgerOffloaderTest.java
@@ -44,6 +44,7 @@ import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.LedgerOffloaderStats;
+import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.impl.LedgerOffloaderStatsImpl;
import org.apache.bookkeeper.mledger.OffloadedLedgerMetadata;
import org.apache.bookkeeper.mledger.offload.jcloud.provider.JCloudBlobStoreProvider;
@@ -582,7 +583,7 @@ public class BlobStoreManagedLedgerOffloaderTest extends BlobStoreManagedLedgerO
try {
toTest.readAsync(0, lac).get();
} catch (Exception e) {
- if (e.getCause() instanceof BKException.BKUnexpectedConditionException) {
+ if (e.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {
// expected exception
return;
}