You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2022/05/12 16:45:18 UTC

[GitHub] [pulsar] nodece opened a new pull request, #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

nodece opened a new pull request, #15568:
URL: https://github.com/apache/pulsar/pull/15568

   Fixes #14883 
   
   ### Motivation
   
   See #14883
   
   ### Modifications
   
   - Add `start_message_id_inclusive` field to `CommandSubscribe` message
   - Client supports set 
   - the `start_message_id_inclusive` value of `CommandSubscribe` message
   - Improve `NonDurableCursorImpl.java` and `ManagedCursorImpl.java` supports reading the message of startMessageId position 
   
   ### Verifying this change
   
   - [x] Make sure that the change passes the CI checks.
   - Added tests
   
   ### Documentation
   
   Need to update docs? 
   
   - [x] `no-need-doc` 
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r886679043


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   According to the PIP. Just when startMessageId == latest, need judge inclusive. We can keep the initializeCursorPosition method unchanged and add inclusive param to the getLastPositionAndCounter method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1125595767

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1156297772

   Then blocked by `org.apache.pulsar.broker.service.SubscriptionSeekTest#testSeekByFunctionAndMultiTopic`, repeated messages are often received.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r886682545


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java:
##########
@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);
                     break;
                 case Earliest:
-                    initializeCursorPosition(ledger.getFirstPositionAndCounter());
+                    initializeCursorPosition(ledger.getFirstPositionAndCounter(), inclusive);
                     break;
             }
         } else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
             // Start from invalid ledger to read from first available entry
-            recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
+            recoverCursor(ledger.getFirstPosition(), inclusive);
         } else {
             // Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
             // read-position
-            recoverCursor(startCursorPosition);
+            recoverCursor(startCursorPosition, inclusive);

Review Comment:
   Is it possible to add a new method in ledger: `getSpecifyPositionAndCounter(Position startPosition, boolean inclusive)`
   
   and this can call: `initializeCursorPosition(ledger.getSpecifyPositionAndCounter(startCursorPosition, inclusive))`



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   According to the PIP. Just when startMessageId == latest, need judge inclusive. We can keep the initializeCursorPosition method unchanged and add inclusive param to the getLastPositionAndCounter method.



##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -878,61 +884,64 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
 
         Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
 
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
-            @Override
-            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
-                }
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, startMessageIdInclusive,

Review Comment:
   Seems like only the code format has changed?



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java:
##########
@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);

Review Comment:
   According to the PIP. Just when startMessageId == latest, need judge inclusive. We can keep the initializeCursorPosition method unchanged and add inclusive param to the getLastPositionAndCounter method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1131381622

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] BewareMyPower commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
BewareMyPower commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r908116593


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java:
##########
@@ -242,11 +242,13 @@ ManagedCursor openCursor(String name, InitialPosition initialPosition) throws In
      *             operation will trigger the creation of the cursor.
      * @param cursorProperties
      *            the properties for the Cursor
+     * @param inclusive
+     *            whether to read from the specified position
      * @return the ManagedCursor
      * @throws ManagedLedgerException
      */
     ManagedCursor openCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,

Review Comment:
   Could you add default override methods for compatibility? Because they are common APIs that can be used by other components like protocol handler.
   
   e.g.
   
   ```java
       default void asyncOpenCursor(String name, InitialPosition initialPosition, Map<String, Long> properties,
                            Map<String, String> cursorProperties, OpenCursorCallback callback, Object ctx) {
           asyncOpenCursor(name, initialPosition, properties, cursorProperties, false, callback, ctx);
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1141834340

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] shibd commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
shibd commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r886637531


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   According to the PIP. Just when `startMessageId == latest`, need judge inclusive. We can keep the `initializeCursorPosition` method unchanged and add inclusive param to the `getLastPositionAndCounter` method.
   



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   According to the PIP. Just when `startMessageId == latest`, need judge inclusive. We can keep the `initializeCursorPosition` method unchanged and add inclusive param to the `getLastPositionAndCounter` method.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] github-actions[bot] commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
github-actions[bot] commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1214539799

   The pr had no activity for 30 days, mark with Stale label.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r896513355


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   initializeCursorPosition will read the next valid position from this position, so I think to handle this is better in initializeCursorPosition.



##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java:
##########
@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);

Review Comment:
   initializeCursorPosition will read the next valid position from this position, so I think to handle this is better in initializeCursorPosition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r896513355


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java:
##########
@@ -939,7 +953,7 @@ public void operationComplete() {
                 cursor.setActive();
                 // Update the ack position (ignoring entries that were written while the cursor was being created)
                 cursor.initializeCursorPosition(initialPosition == InitialPosition.Latest ? getLastPositionAndCounter()

Review Comment:
   initializeCursorPosition will read the next valid position from this position, so we need to handle this in initializeCursorPosition method.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r896520651


##########
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java:
##########
@@ -48,39 +46,46 @@ public class NonDurableCursorImpl extends ManagedCursorImpl {
             // Start from last entry
             switch (initialPosition) {
                 case Latest:
-                    initializeCursorPosition(ledger.getLastPositionAndCounter());
+                    initializeCursorPosition(ledger.getLastPositionAndCounter(), inclusive);
                     break;
                 case Earliest:
-                    initializeCursorPosition(ledger.getFirstPositionAndCounter());
+                    initializeCursorPosition(ledger.getFirstPositionAndCounter(), inclusive);
                     break;
             }
         } else if (startCursorPosition.getLedgerId() == PositionImpl.EARLIEST.getLedgerId()) {
             // Start from invalid ledger to read from first available entry
-            recoverCursor(ledger.getPreviousPosition(ledger.getFirstPosition()));
+            recoverCursor(ledger.getFirstPosition(), inclusive);
         } else {
             // Since the cursor is positioning on the mark-delete position, we need to take 1 step back from the desired
             // read-position
-            recoverCursor(startCursorPosition);
+            recoverCursor(startCursorPosition, inclusive);

Review Comment:
   Looks we only support reading from Earliest/Latest position, so I don't think it's necessary to add this.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on a diff in pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on code in PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#discussion_r896515237


##########
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java:
##########
@@ -878,61 +884,64 @@ private CompletableFuture<Subscription> getDurableSubscription(String subscripti
 
         Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
 
-        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
-            @Override
-            public void openCursorComplete(ManagedCursor cursor, Object ctx) {
-                if (log.isDebugEnabled()) {
-                    log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
-                }
+        ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, startMessageIdInclusive,

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1483881019

   No reviewer, so close this PR.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1126095307

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1133587700

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1139238524

   Blocked by https://github.com/apache/pulsar/pull/15761


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1156296090

   /pulsarbot rerun-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece commented on pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by GitBox <gi...@apache.org>.
nodece commented on PR #15568:
URL: https://github.com/apache/pulsar/pull/15568#issuecomment-1177009570

   This PR has been blocked by https://github.com/apache/pulsar/pull/16171.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [pulsar] nodece closed pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side

Posted by "nodece (via GitHub)" <gi...@apache.org>.
nodece closed pull request #15568: [PIP-150][improve][broker] Support read the message of startMessageId position on the broker side
URL: https://github.com/apache/pulsar/pull/15568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org