You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/24 10:17:07 UTC

[GitHub] [pulsar] 315157973 opened a new pull request #10348: Reader support seek from separate messageId/time

315157973 opened a new pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348


   Fixes #9301
   
   ### Motivation
   Currently in ReaderConfigurationData the API allow to ‘setStartMessageId’ only from single message ID and this apply to all consumers in the `MultiTopicsReaderImpl`.
   
   Is it possible to add start message per partition / topic.
   
   ### Modifications
   Reader can seek by function
   
   ### Verifying this change
   1 For partitioned topic, we can seek by time/messageId for every partition
   2 For non-partitioned topic, we can seek by function


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#issuecomment-826349801


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r628640547



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.

Review comment:
       Please check and keep consistencies for all occurrences.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r623736667



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       Thanks Review.
   If I divide the time and messageId into two interfaces, the flexibility is not so good. Now we can seek time or MessageId at the same time in Function.
   Therefore, I think it can be explained through the documentation. Now if we return an object other than the timestamp and MessageId, an exception will be thrown, and the error will prompt that only timestamp and MessageId are supported
   
   In addition, for throwing exceptions, I will add in the documentation and add corresponding unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r628640455



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
      * <p>
-     * The Function input is topic+partition.
+     * The Function input is topic+partition, and can only return timestamp or MessageId.

Review comment:
       ```suggestion
        * The Function input is topic+partition. It returns only timestamp or MessageId.
   ```

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.

Review comment:
       ```suggestion
        * Reset the subscription associated with this consumer to a specific message ID or message publish time.
   ```

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.

Review comment:
       Please check and keep consistencies for all occurencies. 

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
      * <p>
-     * The Function input is topic+partition.
+     * The Function input is topic+partition, and can only return timestamp or MessageId.
      * <p>
      * The return value is the seek position/timestamp of the current partition.
+     * Exception will be thrown if other types of objects are returned.

Review comment:
       ```suggestion
        * Exception is thrown if other object types are returned.
   ```

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
      * <p>
-     * The Function input is topic+partition.
+     * The Function input is topic+partition, and can only return timestamp or MessageId.
      * <p>
      * The return value is the seek position/timestamp of the current partition.
+     * Exception will be thrown if other types of objects are returned.
      * <p>
      * If returns null, the current partition will not do any processing.
+     * Exception in a partition may affect other partitions.
      * @param function
      * @throws PulsarClientException
      */
     void seek(Function<String, Object> function) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id asynchronously.
+     * Reset the subscription associated with this consumer to a specific message id

Review comment:
       ```suggestion
        * Reset the subscription associated with this consumer to a specific message ID
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r622316748



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -735,7 +735,7 @@ public boolean hasMessageAvailable() throws PulsarClientException {
             if (exception != null) {
                 completableFuture.completeExceptionally(exception);
             } else {
-                completableFuture.complete(hasMessageAvailable.get());
+                completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0);

Review comment:
       this change seems unrelated

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       I am not sure that `Object` is a good return type.
   Just by reading the docs, which data type I have to return ?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
##########
@@ -140,7 +141,7 @@ public boolean hasReachedEndOfTopic() {
 
     @Override
     public boolean hasMessageAvailable() throws PulsarClientException {
-        return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
+        return multiTopicsConsumer.hasMessageAvailable();

Review comment:
       this change seems unrelated

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       We should also describe what happens in case of a RuntimeException thrown by the function.
   1) is the exception rethrown, wrapped by some   subclass of PulsarClientException ?
   2) if the exception is thrown not at the very first topic, what happens to the topics that have already been processed by the function ?
   
   we should that this behaviour in the docs (and have test cases)
   
   (this comment applies to seekAsync as well)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] codelipenghui merged pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
codelipenghui merged pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r623736667



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       Thanks Review.
   If I divide the time and messageId into two interfaces, the flexibility is not so good. Now we can seek time or MessageId at the same time in Function.
   Therefore, I think it can be explained through the documentation. Now if you return an object other than the timestamp and MessageId, an exception will be thrown, and the error will prompt that only timestamp and MessageId are supported
   
   In addition, for throwing exceptions, I will add in the documentation and add corresponding unit tests.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] 315157973 commented on a change in pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
315157973 commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r628715834



##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Consumer.java
##########
@@ -600,26 +600,31 @@
     void seek(long timestamp) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id.
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
      * <p>
-     * The Function input is topic+partition.
+     * The Function input is topic+partition, and can only return timestamp or MessageId.
      * <p>
      * The return value is the seek position/timestamp of the current partition.
+     * Exception will be thrown if other types of objects are returned.
      * <p>
      * If returns null, the current partition will not do any processing.
+     * Exception in a partition may affect other partitions.
      * @param function
      * @throws PulsarClientException
      */
     void seek(Function<String, Object> function) throws PulsarClientException;
 
     /**
-     * Reset the subscription associated with this consumer to a specific message id asynchronously.
+     * Reset the subscription associated with this consumer to a specific message id

Review comment:
       Thanks, I will check other places




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] Anonymitaet commented on pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
Anonymitaet commented on pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#issuecomment-828920829


   @315157973 thanks for your great work. Could you please help add docs accordingly? Then I can help review, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] lhotari commented on pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
lhotari commented on pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#issuecomment-826349801


   /pulsarbot run-failure-checks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] sijie commented on pull request #10348: Reader support seek from separate messageId/time

Posted by GitBox <gi...@apache.org>.
sijie commented on pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#issuecomment-834897049


   @merlimat @codelipenghui Can you review this PR?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org