You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/08/11 03:09:44 UTC

[GitHub] [pulsar] horizonzy opened a new pull request, #17056: Fix offload read handle npe.

horizonzy opened a new pull request, #17056:
URL: https://github.com/apache/pulsar/pull/17056

   ### Motivation
   Now, when we get ReadHanle and then use it to read data (Non-durable cursor read). Maybe the ReadHandle we get already is invalidated.
   
   There is a race condition, maybe after we get the ReadHandle, before use it to read. 
   The method `ManagedLedgerImpl#internalTrimLedgers` invalidate this ReadHandle (Cause in this method, it just calculate by durable-cursor, but there maybe a non-durable cursor read data) so we use the invalidated offload read handle to read data, casue NPE.
   
   
   https://github.com/apache/pulsar/blob/96930fda8a7776aaf6e61235a1d77ffe6e564052/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L2542
   
   ```
   07:11:40.056 [offloader-OrderedScheduler-1-0] WARN  org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [admin][persistent://cme_dev/market_data_mbo_v1/345_0-partition-0] Failed to reset cursor on subscription security_data_normalizer_subscriber_v1_dlabak_m01 to position 10218543:44528:0
   org.apache.pulsar.broker.service.BrokerServiceException$SubscriptionBusyException: Failed to fence subscription
   	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.resetCursor(PersistentSubscription.java:630) ~[io.streamnative-pulsar-broker-2.9.2.23.jar:2.9.2.23]
   	at org.apache.pulsar.broker.service.persistent.PersistentSubscription.resetCursor(PersistentSubscription.java:624) ~[io.streamnative-pulsar-broker-2.9.2.23.jar:2.9.2.23]
   	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase.lambda$internalResetCursorOnPosition$120(PersistentTopicsBase.java:2234) ~[io.streamnative-pulsar-broker-2.9.2.23.jar:2.9.2.23]
   	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:714) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2073) ~[?:?]
   	at org.apache.pulsar.broker.admin.impl.PersistentTopicsBase$2.readEntryFailed(PersistentTopicsBase.java:2277) ~[io.streamnative-pulsar-broker-2.9.2.23.jar:2.9.2.23]
   	at org.apache.bookkeeper.mledger.impl.EntryCacheImpl.lambda$asyncReadEntry0$1(EntryCacheImpl.java:233) ~[io.streamnative-managed-ledger-2.9.2.23.jar:2.9.2.23]
   	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:986) ~[?:?]
   	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:970) ~[?:?]
   	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) ~[?:?]
   	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2088) ~[?:?]
   	at org.apache.bookkeeper.mledger.offload.jcloud.impl.BlobStoreBackedReadHandleImpl.lambda$readAsync$1(BlobStoreBackedReadHandleImpl.java:176) ~[?:?]
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125) [com.google.guava-guava-30.1-jre.jar:?]
   	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:69) [com.google.guava-guava-30.1-jre.jar:?]
   	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78) [com.google.guava-guava-30.1-jre.jar:?]
   	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?]
   	at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?]
   	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]
   	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]
   	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) [io.netty-netty-common-4.1.77.Final.jar:4.1.77.Final]
   	at java.lang.Thread.run(Thread.java:829) [?:?]
   ```
   
   ### Modifications
   1.Fix the NPE.
   2.Check the invalidate ledgerId is less than the slowest non-durable, reduce the npe probability.
   
   
   ### Documentation
   
   Check the box below or label this PR directly.
   
   Need to update docs? 
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
     
   - [ ] `doc-not-needed` 
   (Please explain why)
     
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943258882


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -791,7 +791,8 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {

Review Comment:
   +1 for me.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -791,7 +791,8 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1236336723

   @horizonzy Can you help open a new PR to branch-2.10? There are a lot conflict when cherry-pick directly.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943147813


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -791,7 +791,8 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {

Review Comment:
   Yes, it's difficult to handle the race condition, the offload read handle execute read operation in offload-executor, before the read real execute, the read handle maybe invalidated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944067100


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
   > And we should continue to update the SlowestNonDurableReadPosition to make sure the read handle cache can be invalidated
   
   I think @codelipenghui means to do repeatedly like line 789~790.
   
   
   > Here already invalidate ledger 3,4,5,6.
   
   Why invalidate `4,5,6` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943675043


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
   Here already invalidate ledger 3,4,5,6.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944069903


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
    positions[0] is `3:0`,  positions[1] is `4:0`,  positions[2] is `5:0`, positions[3] is `6:0`, positions[4] `7:0`
   
   At line 789,790. we make non-durable cursor mark-delete to 6:0, the next read postion is `7:0`, the managed-ledger non-durable slowest read position is `7:0`. So the ledger `3,4,5,6` will be invalidate
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1211531736

   And can we add a test to make sure the reader handle will not invalidate if we the slowest read position haven't go through it yet.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943148591


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2326,14 +2328,27 @@ public void checkInactiveSubscriptions() {
     @Override
     public void checkBackloggedCursors() {
         // activate caught up cursors which include consumers
+        AtomicReference<PositionImpl> slowestNonDurableReadPosition = new AtomicReference<>();
         subscriptions.forEach((subName, subscription) -> {
+            ManagedCursor cursor = subscription.getCursor();
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+                if (slowestNonDurableReadPosition.get() == null || readPosition.compareTo(
+                        slowestNonDurableReadPosition.get()) < 0) {
+                    slowestNonDurableReadPosition.set(readPosition);
+                }
+            }
             if (!subscription.getConsumers().isEmpty()
                 && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
                 subscription.getCursor().setActive();
             } else {
                 subscription.getCursor().setInactive();
             }
         });
+        if (slowestNonDurableReadPosition.get() != null) {
+            ManagedLedger managedLedger = getManagedLedger();

Review Comment:
   Agrre, and there will reset `ManagedLedgerImpl.slowestNonDurableReadPosition` when there is no non-duranble cursor, fixed it in next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943080290


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2326,14 +2328,27 @@ public void checkInactiveSubscriptions() {
     @Override
     public void checkBackloggedCursors() {
         // activate caught up cursors which include consumers
+        AtomicReference<PositionImpl> slowestNonDurableReadPosition = new AtomicReference<>();
         subscriptions.forEach((subName, subscription) -> {
+            ManagedCursor cursor = subscription.getCursor();
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+                if (slowestNonDurableReadPosition.get() == null || readPosition.compareTo(
+                        slowestNonDurableReadPosition.get()) < 0) {
+                    slowestNonDurableReadPosition.set(readPosition);
+                }
+            }
             if (!subscription.getConsumers().isEmpty()
                 && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
                 subscription.getCursor().setActive();
             } else {
                 subscription.getCursor().setInactive();
             }
         });
+        if (slowestNonDurableReadPosition.get() != null) {
+            ManagedLedger managedLedger = getManagedLedger();

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944226926


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java:
##########
@@ -135,13 +144,20 @@ public CompletableFuture<Void> closeAsync() {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        if (firstEntry > lastEntry
-                || firstEntry < 0
-                || lastEntry > getLastAddConfirmed()) {
-            promise.completeExceptionally(new IllegalArgumentException());
-            return promise;
-        }
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
+            if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                promise.completeExceptionally(new IllegalArgumentException());

Review Comment:
   nice catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #17056:
URL: https://github.com/apache/pulsar/pull/17056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944222422


##########
tiered-storage/jcloud/src/main/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/BlobStoreBackedReadHandleImplV2.java:
##########
@@ -135,13 +144,20 @@ public CompletableFuture<Void> closeAsync() {
     public CompletableFuture<LedgerEntries> readAsync(long firstEntry, long lastEntry) {
         log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
         CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
-        if (firstEntry > lastEntry
-                || firstEntry < 0
-                || lastEntry > getLastAddConfirmed()) {
-            promise.completeExceptionally(new IllegalArgumentException());
-            return promise;
-        }
         executor.execute(() -> {
+            if (state == State.Closed) {
+                log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}",
+                        ledgerId, firstEntry, lastEntry);
+                promise.completeExceptionally(new ManagedLedgerException.OffloadReadHandleClosedException());
+                return;
+            }
+
+            if (firstEntry > lastEntry
+                    || firstEntry < 0
+                    || lastEntry > getLastAddConfirmed()) {
+                promise.completeExceptionally(new IllegalArgumentException());

Review Comment:
   Should we use the same exception `BKIncorrectParameterException` with v1 or both use `IllegalArgumentException` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1212947962

   Please review this pr again, use the new way to get the slowest non-durable read position. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
michaeljmarshall commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1216135725

   Heads up, cherry picking this commit to older branches broke them because of a missing import. I fixed that here: 74390d46f53125154b4116e3941a6966082c0876


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] hangc0276 commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
hangc0276 commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943068668


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2326,14 +2328,27 @@ public void checkInactiveSubscriptions() {
     @Override
     public void checkBackloggedCursors() {
         // activate caught up cursors which include consumers
+        AtomicReference<PositionImpl> slowestNonDurableReadPosition = new AtomicReference<>();
         subscriptions.forEach((subName, subscription) -> {
+            ManagedCursor cursor = subscription.getCursor();
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+                if (slowestNonDurableReadPosition.get() == null || readPosition.compareTo(
+                        slowestNonDurableReadPosition.get()) < 0) {
+                    slowestNonDurableReadPosition.set(readPosition);
+                }
+            }
             if (!subscription.getConsumers().isEmpty()
                 && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
                 subscription.getCursor().setActive();
             } else {
                 subscription.getCursor().setInactive();
             }
         });
+        if (slowestNonDurableReadPosition.get() != null) {
+            ManagedLedger managedLedger = getManagedLedger();

Review Comment:
   You can directly use `ledger.updateTheSlowestNonDurableReadPosition` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943675463


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));

Review Comment:
   nice suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
Jason918 commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1239038625

   move `release/2.10.2` to https://github.com/apache/pulsar/pull/17478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1236337150

   alright.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943673698


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());

Review Comment:
   To mock the real situation, so here define a non-durable cursor, and mark delete non-durable cursor, then update the slowestNonDurableReadPosition by this non-durable cursor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943080145


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -791,7 +791,8 @@ public synchronized void readEntriesFailed(ManagedLedgerException exception, Obj
                 // Notify the consumer only if all the messages were already acknowledged
                 consumerList.forEach(Consumer::reachedEndOfTopic);
             }
-        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException) {
+        } else if (exception.getCause() instanceof TransactionBufferException.TransactionNotSealedException
+                || exception.getCause() instanceof ManagedLedgerException.OffloadReadHandleClosedException) {

Review Comment:
   Looks like we don't want to handle the race condition of trimming ledgers and reading data, instead, to re-trigger the read operation?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on pull request #17056: [fix][ML] Fix offload read handle NPE.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#issuecomment-1237225628

   > @horizonzy Can you help open a new PR to branch-2.10? There are a lot conflict when cherry-pick directly.
   
   #17478


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943148591


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -2326,14 +2328,27 @@ public void checkInactiveSubscriptions() {
     @Override
     public void checkBackloggedCursors() {
         // activate caught up cursors which include consumers
+        AtomicReference<PositionImpl> slowestNonDurableReadPosition = new AtomicReference<>();
         subscriptions.forEach((subName, subscription) -> {
+            ManagedCursor cursor = subscription.getCursor();
+            if (cursor instanceof NonDurableCursorImpl) {
+                PositionImpl readPosition = (PositionImpl) cursor.getReadPosition();
+                if (slowestNonDurableReadPosition.get() == null || readPosition.compareTo(
+                        slowestNonDurableReadPosition.get()) < 0) {
+                    slowestNonDurableReadPosition.set(readPosition);
+                }
+            }
             if (!subscription.getConsumers().isEmpty()
                 && subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
                 subscription.getCursor().setActive();
             } else {
                 subscription.getCursor().setInactive();
             }
         });
+        if (slowestNonDurableReadPosition.get() != null) {
+            ManagedLedger managedLedger = getManagedLedger();

Review Comment:
   Agrre, and there will reset `ManagedLedgerImpl.slowestNonDurableReadPosition` when there is no non-duranble cursor.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943266412


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);

Review Comment:
   Looks like we don't need a real non-durable cursor?
   Just update the `SlowestNonDurableReadPosition`



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());

Review Comment:
   Do we need this line? Looks the read position will not change.



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
   And we should continue to update the `SlowestNonDurableReadPosition` to make sure the read handle cache can be invalidated



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));

Review Comment:
   We should avoid to use the hardcode ledger ID.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] horizonzy commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
horizonzy commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r943673935


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);

Review Comment:
   To mock the real situation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944067100


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
   > And we should continue to update the SlowestNonDurableReadPosition to make sure the read handle cache can be invalidated
   I think @codelipenghui means to do repeatedly like line 789~790.
   
   > Here already invalidate ledger 3,4,5,6.
   Why invalidate `4,5,6` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Technoboy- commented on a diff in pull request #17056: Fix offload read handle npe.

Posted by GitBox <gi...@apache.org>.
Technoboy- commented on code in PR #17056:
URL: https://github.com/apache/pulsar/pull/17056#discussion_r944071671


##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorTest.java:
##########
@@ -737,6 +738,69 @@ public void testBacklogStatsWhenDroppingData() throws Exception {
         ledger.close();
     }
 
+    @Test
+    public void testInvalidateReadHandleWithSlowNonDurableCursor() throws Exception {
+        ManagedLedgerImpl ledger = (ManagedLedgerImpl) factory.open("testInvalidateReadHandleWithSlowNonDurableCursor",
+                new ManagedLedgerConfig().setMaxEntriesPerLedger(1).setRetentionTime(-1, TimeUnit.SECONDS)
+                        .setRetentionSizeInMB(-1));
+        ManagedCursor c1 = ledger.openCursor("c1");
+        ManagedCursor nonDurableCursor = ledger.newNonDurableCursor(PositionImpl.EARLIEST);
+
+        assertEquals(nonDurableCursor.getNumberOfEntries(), 0);
+        assertEquals(nonDurableCursor.getNumberOfEntriesInBacklog(true), 0);
+
+        List<Position> positions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            positions.add(ledger.addEntry(("entry-" + i).getBytes(UTF_8)));
+        }
+
+        CountDownLatch latch = new CountDownLatch(10);
+        for (int i = 0; i < 10; i++) {
+            ledger.asyncReadEntry((PositionImpl) positions.get(i), new AsyncCallbacks.ReadEntryCallback() {
+                @Override
+                public void readEntryComplete(Entry entry, Object ctx) {
+                    latch.countDown();
+                }
+
+                @Override
+                public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
+                    latch.countDown();
+                }
+            }, null);
+        }
+
+        latch.await();
+
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+        c1.markDelete(positions.get(4));
+
+        CompletableFuture<Void> promise = new CompletableFuture<>();
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertTrue(ledger.ledgerCache.containsKey(3));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(4));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(5));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+
+        promise = new CompletableFuture<>();
+
+        nonDurableCursor.markDelete(positions.get(3));
+        ledger.updateTheSlowestNonDurableReadPosition(nonDurableCursor.getReadPosition());
+
+        ledger.internalTrimConsumedLedgers(promise);
+        promise.join();
+
+        Assert.assertFalse(ledger.ledgerCache.containsKey(3));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(4));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(5));
+        Assert.assertFalse(ledger.ledgerCache.containsKey(6));
+        Assert.assertTrue(ledger.ledgerCache.containsKey(7));
+

Review Comment:
   Ah, thanks.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org