You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/09/20 16:44:05 UTC

[GitHub] [pulsar] lordcheng10 opened a new pull request, #17751: When isAutoSkipNonRecoverableData=true, fix the problem that the mark…

lordcheng10 opened a new pull request, #17751:
URL: https://github.com/apache/pulsar/pull/17751

   ### Motivation
   When isAutoSkipNonRecoverableData=true, fix the problem that the markdelete position does not move forward.
   
   
   ### Modifications
   When skipping the entry, construct the skipped entry, and automatically ack the skipped entry when pushing the message to the consumer.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc-required` 
   (Your PR needs to update docs and you will update later)
   
   - [x] `doc-not-needed` 
   (Please explain why)
   
   - [ ] `doc` 
   (Your PR contains doc changes)
   
   - [ ] `doc-complete`
   (Docs have been already added)
   
   ### Matching PR in forked repository
   
   PR in forked repository: <!-- ENTER URL HERE 
   
   After opening this PR, the build in apache/pulsar will fail and instructions will
   be provided for opening a PR in the PR author's forked repository.
   
   apache/pulsar pull requests should be first tested in your own fork since the 
   apache/pulsar CI based on GitHub Actions has constrained resources and quota.
   GitHub Actions provides separate quota for pull requests that are executed in 
   a forked repository.
   
   The tests will be run in the forked repository until all PR review comments have
   been handled, the tests pass and the PR is approved by a reviewer.
   
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1262398456

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272264081

   > > For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked
   > 
   > I see. It looks like the read operation will return some fake entries to the caller. Then the caller ack the entries according to the skip flag introduced in the `EntryImpl`. What is the difference between ack the entries directly when we complete the read operation by `checkReadCompletion`? In this way, we don't need to introduce a new concept to the `EntryImpl`, and the caller also has no responsibility to help managed cursor handle internal ack state maintenance issues.
   > 
   > BTW, the Pulsar SQL also uses the cursor API to read data from bookies. If we add `skipped` field to the `EntryImpl`, the Pulsar SQL might also need to handle this case.
   
   OK, I will fix


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r983397093


##########
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java:
##########
@@ -1992,7 +1992,14 @@ The delayed message index bucket time step(in seconds) in per bucket snapshot se
         doc = "Skip reading non-recoverable/unreadable data-ledger under managed-ledger's list.\n\n"
             + " It helps when data-ledgers gets corrupted at bookkeeper and managed-cursor is stuck at that ledger."
     )
-    private boolean autoSkipNonRecoverableData = false;
+        private boolean autoSkipNonRecoverableData = false;

Review Comment:
   ```suggestion
       private boolean autoSkipNonRecoverableData = false;
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1348366821

   > OpReadEntry.java
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] congbobo184 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
congbobo184 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1318547566

   @lordcheng10   hi, I move this PR to release/2.9.5, if you have any questions, please ping me. thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981046148


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
+        EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.skipped = skipped;
+        entry.data = Unpooled.wrappedBuffer(new byte[0]);

Review Comment:
   entry.data = Unpooled.EMPTY_BUFFER   ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981256695


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -109,6 +109,16 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+            final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);

Review Comment:
   > It seems, normally, it will just goes from readPosition to nexReadPosition?
   YES
   
   > Will you miss other entries to be acked?
   
   IMO, won't miss other entries to be acked
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] Jason918 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
Jason918 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981217065


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -109,6 +109,16 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+            final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);

Review Comment:
   It seems, normally, it will just goes from `readPosition` to `nexReadPosition`?
   Will you miss other entries to be acked?



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -115,6 +116,16 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
             if (entry == null) {
                 continue;
             }
+            if (entry instanceof EntryImpl) {
+                EntryImpl entryImpl = (EntryImpl) entry;
+                if (entryImpl.skipped()) {
+                    if (entriesToFiltered == null) {
+                        entriesToFiltered = new ArrayList<>();
+                    }
+                    entriesToFiltered.add(entryImpl.getPosition());

Review Comment:
   Do we need `entries.set(i, null);` and `entry.release();`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1263598303

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272270246

   > +1 Agree with @codelipenghui concern. individual ack directly is better than introducing new stuff.
   
   OK , I will  fixed!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1262981065

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272238561

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r979402367


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSiz
                 isReplayRead, consumer);
     }
 
+    /**
+     * 1. Acknowledge skipped messages;
+     * 2. Filter out skipped messages;
+     */
+    public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
+        List<Position> skippedPositions = new ArrayList<>();
+        List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {

Review Comment:
   I wonder if we can move this into filterEntriesForConsumer? This way we save some allocations 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r979409714


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSiz
                 isReplayRead, consumer);
     }
 
+    /**
+     * 1. Acknowledge skipped messages;
+     * 2. Filter out skipped messages;
+     */
+    public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
+        List<Position> skippedPositions = new ArrayList<>();
+        List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {

Review Comment:
   Fixed!
   @eolivelli PTAL,thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1381219394

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264205148

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264526132

   @codelipenghui @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] eolivelli commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
eolivelli commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r980852285


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {

Review Comment:
   what about "createSkippedEntry" ?
   otherwise people may want to use this factory method for other purposes



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
+        EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.skipped = skipped;
+        entry.data = Unpooled.wrappedBuffer(new byte[0]);

Review Comment:
   do we really need to allocate this ? what about using a constant ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272268736

   +1 Agree with Penghui's concern. individual ack directly is better than introducing new stuff. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981046148


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
+        EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.skipped = skipped;
+        entry.data = Unpooled.wrappedBuffer(new byte[0]);

Review Comment:
   entry.data = null?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981258270


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -115,6 +116,16 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
             if (entry == null) {
                 continue;
             }
+            if (entry instanceof EntryImpl) {
+                EntryImpl entryImpl = (EntryImpl) entry;
+                if (entryImpl.skipped()) {
+                    if (entriesToFiltered == null) {
+                        entriesToFiltered = new ArrayList<>();
+                    }
+                    entriesToFiltered.add(entryImpl.getPosition());

Review Comment:
   Fixed, PTAL, thanks! @Jason918 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1263151430

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264818700

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264526151

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272224774

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1262476948

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1262722756

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r979406929


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -91,6 +95,25 @@ public int filterEntriesForConsumer(List<? extends Entry> entries, EntryBatchSiz
                 isReplayRead, consumer);
     }
 
+    /**
+     * 1. Acknowledge skipped messages;
+     * 2. Filter out skipped messages;
+     */
+    public List<Entry> filterAndAcknowledgeSkippedEntry(List<Entry> entries) {
+        List<Position> skippedPositions = new ArrayList<>();
+        List<Entry> filterEntries = Lists.newArrayList(Collections2.filter(entries, entry -> {

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272238113

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1270365986

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1308106961

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272197410

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272270194

   > checkReadCompletion
   
   OK


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272238469

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272255258

   @lordcheng10 Thanks for the clarification. I understand the issue for now.
   
   Can we just ack the messages from the read position(first read failed) to the LAC of the Ledger? For example, if we have a Ledger with 1000 entries. but failed when reading the 501th entry (`NonRecoverableLedgerException `). Maybe the Ledger has two fragments. Only the data of the last fragment data is non-recoverable. Then we can just perform an ack operation from 501th to 999th.
   
   So that we don't need to introduce new options to users and this is more intuitive and easier to understand


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r990727298


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+
+            int toAckEntryNum = 0;
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);
+                toAckEntryNum++;
+                if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {
+                    nexReadPosition = startPosition;
+                    break;
+                }
+            }
+            List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);

Review Comment:
   OK , I will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1269249764

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264152425

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272256581

   > > Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
   > > When pushing entrys to the consumer, automatically ack the skipped entrys;
   > 
   > Sorry, I didn't understand here(From the PR description). If the entry is not able to be read, how can the broker push the entries to the consumer?
   
   For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272258331

   > For unreadable entries, an entry with the skpped flag will be created. When pushing messages, the entries with the skipped flag as true will be filtered out, and then these messages will be acked
   
   I see. It looks like the read operation will return some fake entries to the caller. Then the caller ack the entries according to the skip flag introduced in the `EntryImpl`. What is the difference between ack the entries directly when we complete the read operation by `checkReadCompletion`? In this way, we don't need to introduce a new concept to the `EntryImpl`, and the caller also has no responsibility to help managed cursor handle internal ack state maintenance issues.
   
   BTW, the Pulsar SQL also uses the cursor API to read data from bookies. If we add `skipped` field to the `EntryImpl`, the Pulsar SQL might also need to handle this case.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] poorbarcode commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
poorbarcode commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r990613777


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+
+            int toAckEntryNum = 0;
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);
+                toAckEntryNum++;
+                if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {

Review Comment:
   If the state of the cursor is like this:
   
   ```
   read position {1:0}
   individual deleted messages [ {1:0}, {1:50001} ]
   ```
   
   After the entry filter, the `nexReadPosition` will be {1,10000} and  `filteredEntries` will be {1,0}, then `maxAckEntryNumForAutoSkipNonRecoverableData` could not work perfect. Can we let `cursor` do the filtering?
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+
+            int toAckEntryNum = 0;
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);
+                toAckEntryNum++;
+                if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {
+                    nexReadPosition = startPosition;
+                    break;
+                }
+            }
+            List<Entry> filteredEntries = cursor.filterReadEntries(skippedEntries);

Review Comment:
   Can we just drop the data with the instruction `cursor.delete(positions)`? This saves the memory of `entries` and is easier to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r990728091


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+
+            int toAckEntryNum = 0;
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);
+                toAckEntryNum++;
+                if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {

Review Comment:
   I will fixed



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -116,6 +116,22 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+
+            int toAckEntryNum = 0;
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);
+                toAckEntryNum++;
+                if (toAckEntryNum > cursor.getConfig().getMaxAckEntryNumForAutoSkipNonRecoverableData()) {

Review Comment:
   I will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981249991


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java:
##########
@@ -115,6 +116,16 @@ public int filterEntriesForConsumer(Optional<MessageMetadata[]> optMetadataArray
             if (entry == null) {
                 continue;
             }
+            if (entry instanceof EntryImpl) {
+                EntryImpl entryImpl = (EntryImpl) entry;
+                if (entryImpl.skipped()) {
+                    if (entriesToFiltered == null) {
+                        entriesToFiltered = new ArrayList<>();
+                    }
+                    entriesToFiltered.add(entryImpl.getPosition());

Review Comment:
   OK , I will fix



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1259402746

   @codelipenghui @Technoboy- @Jason918 PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1252667174

   @eolivelli @codelipenghui @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1264205209

   @codelipenghui @Technoboy- PTAL,thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981256695


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -109,6 +109,16 @@ public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
                 recycle();
                 return;
             }
+            final ManagedLedgerImpl ledger = (ManagedLedgerImpl) cursor.getManagedLedger();
+            List<Entry> skippedEntries = new ArrayList<>();
+            PositionImpl startPosition = readPosition;
+            PositionImpl endPosition = (PositionImpl) nexReadPosition;
+            while (startPosition.compareTo(endPosition) < 0) {
+                skippedEntries.add(EntryImpl.createSkippedEntry(startPosition.ledgerId, startPosition.entryId));
+                startPosition = ledger.getNextValidPosition(startPosition);

Review Comment:
   > It seems, normally, it will just goes from readPosition to nexReadPosition?
   
   YES
   
   > Will you miss other entries to be acked?
   
   IMO, won't miss other entries to be acked
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on a diff in pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on code in PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#discussion_r981054030


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {
+        EntryImpl entry = RECYCLER.get();
+        entry.timestamp = System.nanoTime();
+        entry.ledgerId = ledgerId;
+        entry.entryId = entryId;
+        entry.skipped = skipped;
+        entry.data = Unpooled.wrappedBuffer(new byte[0]);

Review Comment:
   Fixed! PTAL,thanks! @eolivelli 



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryImpl.java:
##########
@@ -66,6 +67,17 @@ public static EntryImpl create(long ledgerId, long entryId, byte[] data) {
         return entry;
     }
 
+    public static EntryImpl create(long ledgerId, long entryId, boolean skipped) {

Review Comment:
   Fixed! PTAL,thanks! @eolivelli



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1263022301

   CI has passed: https://github.com/lordcheng10/pulsar/pull/19


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] lordcheng10 commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
lordcheng10 commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272251253

   > @lordcheng10
   > 
   > Great finding! It looks like the problem is after the read operation failed with `NonRecoverableLedgerException`, we only move the read position to the start position of the next ledger, but will not move the mark delete position. Why not just make the operation also move the mark delete position to the next ledger?
   > 
   > ```java
   > updateReadPosition(nexReadPosition);
   >             cursor.asyncMarkDelete(PositionImpl.get(nexReadPosition.getLedgerId(), -1), callback, ctx);
   > ```
   
   When the ackType is individual, there may be unacknowledged messages ahead, so the markdelete position cannot be moved directly


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1272255765

   > Add a skipped variable in EntryImpl to record whether the entry is automatically skipped;
   When pushing entrys to the consumer, automatically ack the skipped entrys;
   
   Sorry, I didn't understand here(From the PR description). If the entry is not able to be read, how can the broker push the entries to the consumer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] michaeljmarshall commented on pull request #17751: [fix][broker] Fix the markdelete position does not move forward when isAutoSkipNonRecoverableData=true and individual ack

Posted by "michaeljmarshall (via GitHub)" <gi...@apache.org>.
michaeljmarshall commented on PR #17751:
URL: https://github.com/apache/pulsar/pull/17751#issuecomment-1610272051

   As discussed on the mailing list https://lists.apache.org/thread/w4jzk27qhtosgsz7l9bmhf1t7o9mxjhp, there is no plan to release 2.9.6, so I am going to remove the release/2.9.6 label


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org