You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2020/10/20 07:16:41 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #8306: Support exclude the message when reset cursor by message ID

315157973 opened a new pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306


   
   Fixes #8259
   ### Motivation
   Currently, when reset the cursor to a position, the broker will set the mark delete position to the previous position of the reset position. For some usecase, we don't want to consume the reset position again, so it's better to provide a way to reset the cursor to a specific position and exclude this position. So that the consumers under the subscription can start consume messages from the next position of the reset position.
   
   ### Modifications
   Add a new API to exclude the message when reset cursor by message ID
   
   ### Verifying this change
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jiazhai commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
jiazhai commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-712745293


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r508938287



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2006,7 +2006,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
     }
 
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
-            MessageIdImpl messageId) {
+            MessageIdImpl messageId, boolean isExclusive) {

Review comment:
       I think it might be more appropriate to change to isExclued.I am modifying it.
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r508454771



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
##########
@@ -2006,7 +2006,7 @@ private void internalCreateSubscriptionForNonPartitionedTopic(AsyncResponse asyn
     }
 
     protected void internalResetCursorOnPosition(AsyncResponse asyncResponse, String subName, boolean authoritative,
-            MessageIdImpl messageId) {
+            MessageIdImpl messageId, boolean isExclusive) {

Review comment:
       exclude? I think exclude is more reasonable here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-713599419


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] wolfstudy commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
wolfstudy commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-713234782


   ping @315157973 Can you check @codelipenghui comments?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r509066533



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
##########
@@ -63,4 +63,9 @@ public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) thr
     public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
         partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
     }
+
+    @Test(dataProvider = "ServiceUrls")
+    public void testResetCursorCompatibility(String serviceUrl) throws Exception {
+        resetCursorCompatibility(serviceUrl, true);

Review comment:
       `resetCursorCompatibility` is a new method and you are only calling it with 'true' parameter.
   is it expected ?
   what about calling also` resetCursorCompatibility(serviceUrl, false);` ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-713298773


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-713607505


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r509208583



##########
File path: tests/integration/src/test/java/org/apache/pulsar/tests/integration/messaging/PersistentTopicMessagingTest.java
##########
@@ -63,4 +63,9 @@ public void testNonPartitionedTopicMessagingWithKeyShared(String serviceUrl) thr
     public void testPartitionedTopicMessagingWithKeyShared(String serviceUrl) throws Exception {
         partitionedTopicSendAndReceiveWithKeyShared(serviceUrl, true);
     }
+
+    @Test(dataProvider = "ServiceUrls")
+    public void testResetCursorCompatibility(String serviceUrl) throws Exception {
+        resetCursorCompatibility(serviceUrl);

Review comment:
       It's need to move the test under the package `org.apache.pulsar.tests.integration.backwardscompatibility` Otherwise the the test is using the latest broker and latest client, so it can't check the compatibility.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r509003640



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1216,15 +1217,17 @@ public void resetCursorOnPosition(
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
-            MessageIdImpl messageId) {
+                    ResetCursorData resetCursorData) {

Review comment:
       Can you add an integration test for that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
315157973 commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-712681546


   /pulsarbot run-failure-checks


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
sijie commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r508998499



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1216,15 +1217,17 @@ public void resetCursorOnPosition(
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
-            MessageIdImpl messageId) {
+                    ResetCursorData resetCursorData) {

Review comment:
       How did you maintain backward compatibility on this endpoint?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r508939823



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1219,7 +1219,43 @@ public void resetCursorOnPosition(
             MessageIdImpl messageId) {
         try {
             validateTopicName(tenant, namespace, encodedTopic);
-            internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId);
+            internalResetCursorOnPosition(asyncResponse, decode(encodedSubName), authoritative, messageId, false);
+        } catch (Exception e) {
+            resumeAsyncResponseExceptionally(asyncResponse, e);
+        }
+    }
+    
+    @POST
+    @Path("/{tenant}/{namespace}/{topic}/subscription/{subName}/resetCursorExclusive")
+    @ApiOperation(value = "Reset subscription to message position closest to given position, " +
+            "and start consume messages from the next position of the reset position.", notes = "It fence cursor and disconnects all active consumers before reseting cursor.")
+    @ApiResponses(value = {
+            @ApiResponse(code = 307, message = "Current broker doesn't serve the namespace of this topic"),
+            @ApiResponse(code = 401, message = "Don't have permission to administrate resources on this tenant or" +
+                    "subscriber is not authorized to access this operation"),
+            @ApiResponse(code = 403, message = "Don't have admin permission"),
+            @ApiResponse(code = 404, message = "Topic/Subscription does not exist"),
+            @ApiResponse(code = 405, message = "Not supported for partitioned topics"),
+            @ApiResponse(code = 412, message = "Unable to find position for position specified"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Failed to validate global cluster configuration") })
+    public void resetCursorOnPositionExclusive(

Review comment:
       I think we'd better do not introduce a new endpoint since we also should support reset to a `batchIndex`. So it's better to introduce a new construct for the cursor reset request such as `ResetCursorData` and we and add the `isExcluded` into the new construct, it looks like
   
   ```
   class ResetCursorData {
       ledgerId,
       entryId,
       isExcluded
   }
   ```
   
   Since we are using the json encoder and decode, I think this will not introduce compatibility issues.

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1314,6 +1325,17 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      */
     CompletableFuture<Void> resetCursorAsync(String topic, String subName, long timestamp);
 
+    /**
+     * Reset cursor position on a topic subscription.
+     * <p/>
+     * and start consume messages from the next position of the reset position.
+     * @param topic
+     * @param subName
+     * @param messageId
+     * @return
+     */
+    CompletableFuture<Void> resetCursorExclusiveAsync(String topic, String subName, MessageId messageId);

Review comment:
       ```suggestion
       CompletableFuture<Void> resetCursorAsync(String topic, String subName, MessageId messageId, boolean isExcluded);
   ```

##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Topics.java
##########
@@ -1302,6 +1302,17 @@ void createSubscription(String topic, String subscriptionName, MessageId message
      */
     void resetCursor(String topic, String subName, long timestamp) throws PulsarAdminException;
 
+    /**
+     * Reset cursor position on a topic subscription.
+     * <p/>
+     * and start consume messages from the next position of the reset position.
+     * @param topic
+     * @param subName
+     * @param messageId
+     * @throws PulsarAdminException
+     */
+    void resetCursorExclusive(String topic, String subName, MessageId messageId) throws PulsarAdminException;

Review comment:
       ```suggestion
       void resetCursor(String topic, String subName, MessageId messageId, boolean isExcluded) throws PulsarAdminException;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r509355248



##########
File path: pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/TopicsImpl.java
##########
@@ -1132,22 +1133,50 @@ public void resetCursor(String topic, String subName, long timestamp) throws Pul
     @Override
     public void resetCursor(String topic, String subName, MessageId messageId) throws PulsarAdminException {
         try {
-            TopicName tn = validateTopic(topic);
-            String encodedSubName = Codec.encode(subName);
-            WebTarget path = topicPath(tn, "subscription", encodedSubName, "resetcursor");
-            request(path).post(Entity.entity(messageId, MediaType.APPLICATION_JSON),
-                            ErrorData.class);
+            resetCursorAsync(topic, subName, messageId).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
+        } catch (ExecutionException e) {
+            throw (PulsarAdminException) e.getCause();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new PulsarAdminException(e);
+        } catch (TimeoutException e) {
+            throw new PulsarAdminException.TimeoutException(e);
+        } catch (Exception e) {
+            throw getApiException(e);
+        }
+    }
+
+    @Override
+    public void resetCursor(String topic, String subName, MessageId messageId
+            , boolean isExcluded) throws PulsarAdminException {
+        try {
+            resetCursorAsync(topic, subName, messageId, true).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);

Review comment:
       ```suggestion
               resetCursorAsync(topic, subName, messageId, isExcluded).get(this.readTimeoutMs, TimeUnit.MILLISECONDS);
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r508999350



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1216,15 +1217,17 @@ public void resetCursorOnPosition(
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
-            MessageIdImpl messageId) {
+                    ResetCursorData resetCursorData) {

Review comment:
       > How did you maintain backward compatibility on this endpoint?
   
   Now the properties of this object are consistent with the previous `MessageId`, and there is no problem with Json deserialization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui commented on pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
codelipenghui commented on pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#issuecomment-714191535


   cherry-picked to branch-2.6(2.6.2)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #8306: Support exclude the message when reset cursor by message ID

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #8306:
URL: https://github.com/apache/pulsar/pull/8306#discussion_r509008037



##########
File path: pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
##########
@@ -1216,15 +1217,17 @@ public void resetCursorOnPosition(
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(name = "messageId", value = "messageId to reset back to (ledgerId:entryId)")
-            MessageIdImpl messageId) {
+                    ResetCursorData resetCursorData) {

Review comment:
       > Can you add an integration test for that?
   
   OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org