You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/04/28 15:53:52 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #10348: Reader support seek from separate messageId/time

eolivelli commented on a change in pull request #10348:
URL: https://github.com/apache/pulsar/pull/10348#discussion_r622316748



##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
##########
@@ -735,7 +735,7 @@ public boolean hasMessageAvailable() throws PulsarClientException {
             if (exception != null) {
                 completableFuture.completeExceptionally(exception);
             } else {
-                completableFuture.complete(hasMessageAvailable.get());
+                completableFuture.complete(hasMessageAvailable.get() || numMessagesInQueue() > 0);

Review comment:
       this change seems unrelated

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       I am not sure that `Object` is a good return type.
   Just by reading the docs, which data type I have to return ?

##########
File path: pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsReaderImpl.java
##########
@@ -140,7 +141,7 @@ public boolean hasReachedEndOfTopic() {
 
     @Override
     public boolean hasMessageAvailable() throws PulsarClientException {
-        return multiTopicsConsumer.hasMessageAvailable() || multiTopicsConsumer.numMessagesInQueue() > 0;
+        return multiTopicsConsumer.hasMessageAvailable();

Review comment:
       this change seems unrelated

##########
File path: pulsar-client-api/src/main/java/org/apache/pulsar/client/api/Reader.java
##########
@@ -155,6 +157,33 @@
      */
     void seek(long timestamp) throws PulsarClientException;
 
+    /**
+     * Reset the subscription associated with this consumer to a specific message id or message publish time.
+     * <p>
+     * The Function input is topic+partition.
+     * <p>
+     * The return value is the seek position/timestamp of the current partition.
+     * <p>
+     * If returns null, the current partition will not do any processing.
+     * @param function
+     * @throws PulsarClientException
+     */
+    void seek(Function<String, Object> function) throws PulsarClientException;

Review comment:
       We should also describe what happens in case of a RuntimeException thrown by the function.
   1) is the exception rethrown, wrapped by some   subclass of PulsarClientException ?
   2) if the exception is thrown not at the very first topic, what happens to the topics that have already been processed by the function ?
   
   we should that this behaviour in the docs (and have test cases)
   
   (this comment applies to seekAsync as well)
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org