You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/12/23 01:28:51 UTC

[GitHub] [pulsar] coderzc opened a new pull request, #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

coderzc opened a new pull request, #19035:
URL: https://github.com/apache/pulsar/pull/19035

   PIP: #16763
   
   <!-- Details of when a PIP is required and how the PIP process work, please see: https://github.com/apache/pulsar/blob/master/wiki/proposals/PIP.md -->
   
   ### Motivation
   
   #16763
   
   ### Modifications
   
   Implement Filter out all delayed messages and skip them when reading messages from bookies
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
     - *Added integration tests for end-to-end deployment with large payloads (10MB)*
     - *Extended integration test for recovery after broker failure*
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. Please attach the local preview screenshots (run `sh start.sh` at `pulsar/site2/website`) to your PR description, or else your PR might not get merged. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->
   
   ### Matching PR in forked repository
   
   PR in forked repository: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codecov-commenter commented on pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#issuecomment-1367169763

   # [Codecov](https://codecov.io/gh/apache/pulsar/pull/19035?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#19035](https://codecov.io/gh/apache/pulsar/pull/19035?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ac8d0ce) into [master](https://codecov.io/gh/apache/pulsar/commit/feb3cb4d7a484a284e06474713870609b220abfc?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (feb3cb4) will **decrease** coverage by `9.12%`.
   > The diff coverage is `5.55%`.
   
   [![Impacted file tree graph](https://codecov.io/gh/apache/pulsar/pull/19035/graphs/tree.svg?width=650&height=150&src=pr&token=acYqCpsK9J&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)](https://codecov.io/gh/apache/pulsar/pull/19035?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   
   ```diff
   @@             Coverage Diff              @@
   ##             master   #19035      +/-   ##
   ============================================
   - Coverage     46.35%   37.22%   -9.13%     
   + Complexity     8939     1989    -6950     
   ============================================
     Files           597      209     -388     
     Lines         56858    14452   -42406     
     Branches       5905     1577    -4328     
   ============================================
   - Hits          26357     5380   -20977     
   + Misses        27616     8489   -19127     
   + Partials       2885      583    -2302     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `37.22% <5.55%> (-9.13%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/pulsar/pull/19035?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...va/org/apache/pulsar/client/impl/ConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVySW1wbC5qYXZh) | `15.06% <0.00%> (-0.03%)` | :arrow_down: |
   | [...he/pulsar/client/impl/MultiTopicsConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL011bHRpVG9waWNzQ29uc3VtZXJJbXBsLmphdmE=) | `22.80% <0.00%> (+0.01%)` | :arrow_up: |
   | [...ache/pulsar/client/impl/ZeroQueueConsumerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1plcm9RdWV1ZUNvbnN1bWVySW1wbC5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...va/org/apache/pulsar/client/impl/ConsumerBase.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NvbnN1bWVyQmFzZS5qYXZh) | `21.51% <6.66%> (-0.42%)` | :arrow_down: |
   | [...va/org/apache/pulsar/client/impl/ProducerBase.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL1Byb2R1Y2VyQmFzZS5qYXZh) | `32.69% <0.00%> (-1.93%)` | :arrow_down: |
   | [.../org/apache/pulsar/client/impl/ConnectionPool.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0Nvbm5lY3Rpb25Qb29sLmphdmE=) | `37.43% <0.00%> (-1.03%)` | :arrow_down: |
   | [.../pulsar/client/impl/BatchMessageContainerImpl.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0JhdGNoTWVzc2FnZUNvbnRhaW5lckltcGwuamF2YQ==) | `55.95% <0.00%> (-1.02%)` | :arrow_down: |
   | [.../java/org/apache/pulsar/client/impl/ClientCnx.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWNsaWVudC9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2NsaWVudC9pbXBsL0NsaWVudENueC5qYXZh) | `29.97% <0.00%> (ø)` | |
   | [...rg/apache/pulsar/broker/delayed/bucket/Bucket.java](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHVsc2FyLWJyb2tlci9zcmMvbWFpbi9qYXZhL29yZy9hcGFjaGUvcHVsc2FyL2Jyb2tlci9kZWxheWVkL2J1Y2tldC9CdWNrZXQuamF2YQ==) | | |
   | ... and [394 more](https://codecov.io/gh/apache/pulsar/pull/19035/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058795666


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
+        op.skipCondition = skipCondition;

Review Comment:
   You are right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058704402


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
+        op.skipCondition = skipCondition;
         op.ctx = ctx;
         op.nextReadPosition = PositionImpl.get(op.readPosition);
         return op;
     }
 
-    @Override
-    public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+    void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {

Review Comment:
   I'm not sure if it is good to extend this method because we defined a parameter named `context`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058703990


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
+        op.skipCondition = skipCondition;

Review Comment:
   it looks like we forget to reset it when recycle this obj



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#issuecomment-1367682683

   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui merged pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui merged PR #19035:
URL: https://github.com/apache/pulsar/pull/19035


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#issuecomment-1367695703

   @coderzc 
   
   ```
   Error:  Tests run: 556, Failures: 1, Errors: 0, Skipped: 520, Time elapsed: 46.303 s <<< FAILURE! - in org.apache.bookkeeper.mledger.impl.ManagedCursorTest
   [1957](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1958)
     Error:  testAsyncReadWithMaxSizeByte(org.apache.bookkeeper.mledger.impl.ManagedCursorTest)  Time elapsed: 20.008 s  <<< FAILURE!
   [1958](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1959)
     org.testng.internal.thread.ThreadTimeoutException: Method org.apache.bookkeeper.mledger.impl.ManagedCursorTest.testAsyncReadWithMaxSizeByte() didn't finish within the time-out 20000
   [1959](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1960)
     	at java.base@17.0.5/jdk.internal.misc.Unsafe.park(Native Method)
   [1960](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1961)
     	at java.base@17.0.5/java.util.concurrent.locks.LockSupport.park(LockSupport.java:211)
   [1961](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1962)
     	at java.base@17.0.5/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:715)
   [1962](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1963)
     	at java.base@17.0.5/java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1047)
   [1963](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1964)
     	at java.base@17.0.5/java.util.concurrent.CountDownLatch.await(CountDownLatch.java:230)
   [1964](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1965)
     	at app//org.apache.bookkeeper.mledger.impl.ManagedCursorTest.readAndCheck(ManagedCursorTest.java:572)
   [1965](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1966)
     	at app//org.apache.bookkeeper.mledger.impl.ManagedCursorTest.testAsyncReadWithMaxSizeByte(ManagedCursorTest.java:549)
   [1966](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1967)
     	at java.base@17.0.5/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   [1967](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1968)
     	at java.base@17.0.5/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
   [1968](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1969)
     	at java.base@17.0.5/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   [1969](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1970)
     	at java.base@17.0.5/java.lang.reflect.Method.invoke(Method.java:568)
   [1970](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1971)
     	at app//org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
   [1971](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1972)
     	at app//org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
   [1972](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1973)
     	at app//org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
   [1973](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1974)
     	at app//org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
   [1974](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1975)
     	at java.base@17.0.5/java.util.concurrent.FutureTask.run(FutureTask.java:264)
   [1975](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1976)
     	at java.base@17.0.5/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
   [1976](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1977)
     	at java.base@17.0.5/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
   [1977](https://github.com/apache/pulsar/actions/runs/3799672325/jobs/6471334338#step:10:1978)
     	at java.base@17.0.5/java.lang.Thread.run(Thread.java:833)
   ```
   
   I noticed this test failed multiple times. Maybe related to the changes of this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058810799


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2056,6 +2056,37 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
 
         long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
 
+        // Filer out and skip unnecessary read entry
+        if (opReadEntry.skipCondition != null) {
+            long firstValidEntry = -1L;
+            long lastValidEntry = -1L;
+            long entryId = firstEntry;
+            for (; entryId <= lastEntry; entryId++) {
+                if (!opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    firstValidEntry = entryId;
+                    break;
+                }
+            }
+
+            // If all messages in [firstEntry...lastEntry] are filter out,
+            // then manual call internalReadEntriesComplete to advance read position.
+            if (firstValidEntry == -1L) {
+                opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
+                        PositionImpl.get(ledger.getId(), lastEntry));
+                return;
+            }
+
+            for (; entryId <= lastEntry; entryId++) {
+                if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    break;
+                }
+                lastValidEntry = entryId;
+            }

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058703518


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2056,6 +2056,37 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
 
         long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
 
+        // Filer out and skip unnecessary read entry
+        if (opReadEntry.skipCondition != null) {
+            long firstValidEntry = -1L;
+            long lastValidEntry = -1L;
+            long entryId = firstEntry;
+            for (; entryId <= lastEntry; entryId++) {
+                if (!opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    firstValidEntry = entryId;
+                    break;
+                }
+            }
+
+            // If all messages in [firstEntry...lastEntry] are filter out,
+            // then manual call internalReadEntriesComplete to advance read position.
+            if (firstValidEntry == -1L) {
+                opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
+                        PositionImpl.get(ledger.getId(), lastEntry));
+                return;
+            }
+
+            for (; entryId <= lastEntry; entryId++) {
+                if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    break;
+                }
+                lastValidEntry = entryId;
+            }

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058704911


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {

Review Comment:
   How about `lastPosition == null` && `entriesCount == 0`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058811258


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {
+            lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        }

Review Comment:
   It seems to we can only judge `entriesCount != 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058704911


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {

Review Comment:
   How about `lastPosition == null` && `entriesCount == 0`? potential NPE?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058690786


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -2056,6 +2056,37 @@ private void internalReadFromLedger(ReadHandle ledger, OpReadEntry opReadEntry)
 
         long lastEntry = min(firstEntry + opReadEntry.getNumberOfEntriesToRead() - 1, lastEntryInLedger);
 
+        // Filer out and skip unnecessary read entry
+        if (opReadEntry.skipCondition != null) {
+            long firstValidEntry = -1L;
+            long lastValidEntry = -1L;
+            long entryId = firstEntry;
+            for (; entryId <= lastEntry; entryId++) {
+                if (!opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    firstValidEntry = entryId;
+                    break;
+                }
+            }
+
+            // If all messages in [firstEntry...lastEntry] are filter out,
+            // then manual call internalReadEntriesComplete to advance read position.
+            if (firstValidEntry == -1L) {
+                opReadEntry.internalReadEntriesComplete(Collections.emptyList(), opReadEntry.ctx,
+                        PositionImpl.get(ledger.getId(), lastEntry));
+                return;
+            }
+
+            for (; entryId <= lastEntry; entryId++) {
+                if (opReadEntry.skipCondition.test(PositionImpl.get(ledger.getId(), entryId))) {
+                    break;
+                }
+                lastValidEntry = entryId;
+            }

Review Comment:
   Maybe you can try to optimize with only one loop to find the first invalid entry range 😁



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058819954


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -167,6 +167,18 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
     void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                           Object ctx, PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     */
+    void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,

Review Comment:
   default implementation? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1057724542


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -150,9 +150,11 @@ enum IndividualDeletedEntries {
      *            opaque context
      * @param maxPosition
      *            max position can read
+     * @param skipCondition
+     *            predicate of read filter out
      */
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
-                          PositionImpl maxPosition);
+                          PositionImpl maxPosition, Predicate<PositionImpl> skipCondition);

Review Comment:
   Good idea.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] mattisonchao commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
mattisonchao commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058705556


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {
+            lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        }

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058827411


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -167,6 +167,18 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
     void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                           Object ctx, PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     */
+    void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,

Review Comment:
   Yes, I mean just add `default` 
   
   ```
   default void asyncReadEntriesWithSkip()
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058811258


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {
+            lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        }

Review Comment:
   It seems to we can just judge `entriesCount != 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1057026841


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -150,9 +150,11 @@ enum IndividualDeletedEntries {
      *            opaque context
      * @param maxPosition
      *            max position can read
+     * @param skipCondition
+     *            predicate of read filter out
      */
     void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, Object ctx,
-                          PositionImpl maxPosition);
+                          PositionImpl maxPosition, Predicate<PositionImpl> skipCondition);

Review Comment:
   It's better to add a new method.
   I mean, we'd better not change the existing API directly.
   This class is annotated with 
   
   ```
   @InterfaceAudience.LimitedPrivate
   @InterfaceStability.Stable
   ```
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] codelipenghui commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058688478


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java:
##########
@@ -1045,7 +1056,7 @@ public boolean trackDelayedDelivery(long ledgerId, long entryId, MessageMetadata
         }
     }
 
-    protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
+    protected synchronized NavigableSet<PositionImpl>  getMessagesToReplayNow(int maxMessagesToRead) {

Review Comment:
   We don't need to add a space.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -72,13 +76,19 @@ public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
         }
         cursor.updateReadStats(entriesCount, entriesSize);
 
-        final PositionImpl lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        if (lastPosition == null || entriesCount != 0) {
+            lastPosition = (PositionImpl) returnedEntries.get(entriesCount - 1).getPosition();
+        }

Review Comment:
   If the `returnedEntries` is empty and the `lastPosition` is null, we will get an exception.



##########
managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java:
##########
@@ -4252,5 +4253,64 @@ public void testLazyCursorLedgerCreationForSubscriptionCreation() throws Excepti
         factory2.shutdown();
     }
 
+    @Test
+    public void testReadEntriesWithFilterOut() throws ManagedLedgerException, InterruptedException, ExecutionException {
+        int readMaxNumber = 10;
+        int sendNumber = 20;
+        ManagedLedger ledger = factory.open("testReadEntriesWithFilter");
+        ManagedCursor cursor = ledger.openCursor("c");

Review Comment:
   Should be closed after the test.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java:
##########
@@ -167,6 +167,18 @@ void asyncReadEntries(int numberOfEntriesToRead, ReadEntriesCallback callback, O
     void asyncReadEntries(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,
                           Object ctx, PositionImpl maxPosition);
 
+    /**
+     * Asynchronously read entries from the ManagedLedger.
+     *
+     * @param numberOfEntriesToRead maximum number of entries to return
+     * @param maxSizeBytes          max size in bytes of the entries to return
+     * @param callback              callback object
+     * @param ctx                   opaque context
+     * @param maxPosition           max position can read
+     */
+    void asyncReadEntriesWithSkip(int numberOfEntriesToRead, long maxSizeBytes, ReadEntriesCallback callback,

Review Comment:
   It's better to add a default implementation. Otherwise, we will break the user's existing implementations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] coderzc commented on a diff in pull request #19035: [feat][broker][PIP-195]Implement Filter out all delayed messages and skip them when reading messages from bookies - part7

Posted by GitBox <gi...@apache.org>.
coderzc commented on code in PR #19035:
URL: https://github.com/apache/pulsar/pull/19035#discussion_r1058823616


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpReadEntry.java:
##########
@@ -57,13 +61,13 @@ public static OpReadEntry create(ManagedCursorImpl cursor, PositionImpl readPosi
             maxPosition = PositionImpl.LATEST;
         }
         op.maxPosition = maxPosition;
+        op.skipCondition = skipCondition;
         op.ctx = ctx;
         op.nextReadPosition = PositionImpl.get(op.readPosition);
         return op;
     }
 
-    @Override
-    public void readEntriesComplete(List<Entry> returnedEntries, Object ctx) {
+    void internalReadEntriesComplete(List<Entry> returnedEntries, Object ctx, PositionImpl lastPosition) {

Review Comment:
   `context` already be `opReadEntry.ctx` passing



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org