You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/07 01:08:09 UTC

[GitHub] [pulsar] dlg99 opened a new pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

dlg99 opened a new pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498


   ### Motivation
   
   Allow sink to rewind a topic to given offset and pause/resume consumer for given topic
   This is needed for https://github.com/apache/pulsar/pull/9927/ https://github.com/apache/pulsar/pull/9927/files#r595722189 
   
   ### Modifications
   
   SinkContext API:
   New methods to seek/pause/resume
   Matching implementations in ContextImpl
   
   Added ExtendedSourceContext interface (not public) to provide access to the PulsarSource's consumers.
   Updated ContextImpl and PulsarSource's implementations to provide this functionality.
   
   
   ### Verifying this change
   
   - [ ] Make sure that the change passes the CI checks.
   
   *(Please pick either of the following options)*
   
   This change added tests and can be verified as follows:
     - Added unit tests
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
     - The public API: yes
   
   SinkContext added new methods, default implementation provided. 
   
   ### Documentation
   
     - Does this pull request introduce a new feature? yes
     - If yes, how is the feature documented? JavaDocs
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#issuecomment-840821323


   @jerrypeng @eolivelli I did the changes that @jerrypeng requested, also figured that `MultiTopicsConsumerImpl` (and, transitively, `PatternMultiTopicsConsumerImpl`) need special handling because `getTopic()`for such consumers does not return the topic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630431079



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName req = TopicName.get(topicName);
+                    if (inputConsumers.containsKey(req)) {
+                        return inputConsumers.get(req);
+                    }
+                } catch (Exception e) {
+                    return null;

Review comment:
       `java.util.Function` only allow unchecked exceptions there, and I don't think extra interface w/Exception is justified and that exception provides more value. Exception can come from `TopicName.get()` witch throws RuntimeException and IllegalArgumentException AFAICT. tbh I don't really see benefits of figuring out which one happened.
   I'll add logging.

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
##########
@@ -71,6 +73,19 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
 
         ConsumerBuilder<T> cb = createConsumeBuilder(topic, pulsarSourceConsumerConfig);
         consumer = cb.subscribeAsync().join();
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName src = TopicName.get(topic);
+                    if (src.equals(TopicName.get(topicName))) {
+                        return consumer;
+                    }
+                } catch (Exception e) {

Review comment:
       same as above




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632323145



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {
+                // MultiTopicsConsumer's list of consumers could change
+                // if partitions changed or pattern(s) used to subscribe
+                inputConsumers.stream()

Review comment:
       It is not duplicating. It skips single topic consumers (`Stream.empty()`) but reprocesses MultiTopicsConsumerImpl in case new consumers appeared (happens on repartition or a new topic that matches provided pattern if pattern is used)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632325403



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {

Review comment:
       No override there.
   It is not used outside of JavaInstanceRunnable/not defined in any interfaces




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#issuecomment-842499945


   @jerrypeng do you have any additional feedback or is it good to go?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r627917835



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       I think this is a little more complicated that it needs to be.  After the a PulsarSource is done opening, we can just get a list of consumers from the PulsarSource and pass it into the ContextImpl.  That approach will be much simpler and we don't need to the additional interfaces.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632332932



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {

Review comment:
       got it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633734533



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       This doesn't seem like the right way to do this.  Retrying twice seems arbitrary.  I would rather let the user implement how many retries they want.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630431285



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +710,41 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic);
+        final MessageId msgId;
+        if (partition == 0) {
+            msgId = messageId;
+        } else {
+            TopicName topicName = TopicName.get(topic);
+            msgId = new TopicMessageIdImpl(
+                    topicName.getPartition(partition).toString(), topicName.toString(), messageId);
+        }
+        consumer.seek(msgId);
+    }
+
+    @Override
+    public void pause(String topic) throws PulsarClientException {
+        getConsumer(topic).pause();
+    }
+
+    @Override
+    public void resume(String topic) throws PulsarClientException {
+        getConsumer(topic).resume();
+    }
+
+    @Override
+    public void setConsumerGetter(java.util.function.Function<String, Consumer<?>> getConsumerFunc) {
+        this.getConsumerFunc = getConsumerFunc;
+    }
+
+    private Consumer<?> getConsumer(String topic) throws PulsarClientException {
+        if (getConsumerFunc == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        return getConsumerFunc.apply(topic);

Review comment:
       I'll make it return Optional




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632324796



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {

Review comment:
       Try to get a consumer. If not found,  reprocess MultiTopicsConsumers in case new consumers appeared (happens on repartition or a new topic that matches provided pattern if the pattern is used), give it another try. 
   it modifies `topicConsumers` (map of topic to a consumer.) 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632659889



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       Two attempts to get the consumer:
   1. Try to get the consumer. 
   2. If not found, reprocess MultiTopicsConsumers in case new consumers appeared (happens on repartition or a new topic that matches provided pattern if the pattern is used), 
   3. give it another try.
   
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633786565



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       @dlg99 if partitions / topics are added dynamically because of topic pattern match or increases in partition count, how would the function know when that happened and why would trying to the consumers twice help with that?  There is an inherent race condition there.  If can always argue there can be a scenario that topics/partitions got created after you called this method.  If the goal of the method is to get new consumers for new topics / partitions then this API is not appropriate.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#issuecomment-834058835


   @dlg99 can you also please explain why this is needed for the Pulsar IO Sink and Kafka Sink compatibility layer?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630562488



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       > I preferred the func to keep topic comparison logic in the PulsarSource plus case PulsarSource ever decides to create consumers dynamically/after open()
   
   Currently that is not the case nor  is there a plan to do so.  We don't need to write code to prepare for a "what-if" scenario.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r629744063



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       One way or another the PulsarSource will need to set data to the context. 
   In this case we either make pulsar source depend on concrete ContextImpl, or extend SourceContext with a setter that is not needed in actual Source, or add the interface that extends SourceContext to not make that method a part of the public API.
   Whether it sets map of consumers or a func that resolves it is a matter of choice. I preferred the func to keep topic comparison logic in the PulsarSource plus case PulsarSource ever decides to create consumers dynamically/after open()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632333045



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {
+                // MultiTopicsConsumer's list of consumers could change
+                // if partitions changed or pattern(s) used to subscribe
+                inputConsumers.stream()

Review comment:
       got it

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {

Review comment:
       I see




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632314094



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {

Review comment:
       I am not sure I understand why in this "get" method we are modifying the internal `inputConsumers` variable.
   
   

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {

Review comment:
       missing Override ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {
+            Consumer<?> consumer = topicConsumers.get(TopicName.get(topic).getPartition(partition));
+
+            if (consumer != null) {
+                return consumer;
+            }
+
+            if (partition == 0) {
+                consumer = topicConsumers.get(TopicName.get(topic));
+
+                if (consumer != null) {
+                    return consumer;
+                }
+            }
+
+            if (i == 0) {
+                // MultiTopicsConsumer's list of consumers could change
+                // if partitions changed or pattern(s) used to subscribe
+                inputConsumers.stream()

Review comment:
       what about moving this duplicated code to a common method ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli merged pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
eolivelli merged pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630562488



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       > I preferred the func to keep topic comparison logic in the PulsarSource plus case PulsarSource ever decides to create consumers dynamically/after open()
   
   Currently that is not the case nor  is there a plan to do so.  There is no point to write code to prepare for a "what-if" scenario.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#issuecomment-842499945


   @jerrypeng do you have any additional feedback or is it good to go?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#issuecomment-834067465


   @jerrypeng kafka-connect-adaptor implements some of the kafka interfaces. For the sink it is SinkTaskContext https://kafka.apache.org/23/javadoc/org/apache/kafka/connect/sink/SinkTaskContext.html - it has offset/pause/resume methods that need seek/pause/resume from the Sink context.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633911631



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       We talked offline about reasons it is needed.
   I expressed the retry more clearly and added comments. Please take a look.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] eolivelli commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
eolivelli commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630086497



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName req = TopicName.get(topicName);
+                    if (inputConsumers.containsKey(req)) {
+                        return inputConsumers.get(req);
+                    }
+                } catch (Exception e) {
+                    return null;

Review comment:
       can we rethrow the exception  ?
   can we catch a specific exception type ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
##########
@@ -71,6 +73,19 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
 
         ConsumerBuilder<T> cb = createConsumeBuilder(topic, pulsarSourceConsumerConfig);
         consumer = cb.subscribeAsync().join();
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName src = TopicName.get(topic);
+                    if (src.equals(TopicName.get(topicName))) {
+                        return consumer;
+                    }
+                } catch (Exception e) {

Review comment:
       can we rethrow the exception ?
   can we catch a specific exception type ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +710,41 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic);
+        final MessageId msgId;
+        if (partition == 0) {
+            msgId = messageId;
+        } else {
+            TopicName topicName = TopicName.get(topic);
+            msgId = new TopicMessageIdImpl(
+                    topicName.getPartition(partition).toString(), topicName.toString(), messageId);
+        }
+        consumer.seek(msgId);
+    }
+
+    @Override
+    public void pause(String topic) throws PulsarClientException {
+        getConsumer(topic).pause();
+    }
+
+    @Override
+    public void resume(String topic) throws PulsarClientException {
+        getConsumer(topic).resume();
+    }
+
+    @Override
+    public void setConsumerGetter(java.util.function.Function<String, Consumer<?>> getConsumerFunc) {
+        this.getConsumerFunc = getConsumerFunc;
+    }
+
+    private Consumer<?> getConsumer(String topic) throws PulsarClientException {
+        if (getConsumerFunc == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        return getConsumerFunc.apply(topic);

Review comment:
       this function may return `null`, probably it is better to throw an Exception here,
   otherwise we have to handle null` `in every other function: pause/resume...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633734533



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       This doesn't seem like the right way to do this.  Retrying twice seems arbitrary.  I would rather let the user implement how many retries they want.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633740600



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       @jerrypeng this is not arbitrary.
   I have no access to callbacks/events when the MultiTopicsConsumers adds another partition or a topic (for pattern-based subscription). MultiTopicsConsumers do have that internally.
   So if the check of previously cached topic/partition -> consumer map finds nothing it means that it:
   - either doesn't exist
   - or the topic/partition (and consumer) got added after the initial map was built.
   In that case it makes sense to check MultiTopicsConsumers once to update the map and try again. 
   I don't think blocking and polling is justified, at least I don't have such usecase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r633740600



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       @jerrypeng this is not arbitrary.
   I have no access to callbacks/events when the MultiTopicsConsumers adds another partition or a topic (for pattern-based subscription). MultiTopicsConsumers do have that internally.
   So if the check of previously cached topic/partition -> consumer map finds nothing it means that it:
   - either doesn't exist
   - or the topic/partition (and consumer) got added after the initial map was built.
   In that case it makes sense to check MultiTopicsConsumers once to update the map and try again. 
   I don't think blocking and polling is justified, at least I don't have such usecase.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r632656974



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +714,67 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic, partition);
+        consumer.seek(messageId);
+    }
+
+    @Override
+    public void pause(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).pause();
+    }
+
+    @Override
+    public void resume(String topic, int partition) throws PulsarClientException {
+        getConsumer(topic, partition).resume();
+    }
+
+    public void setInputConsumers(List<Consumer<?>> inputConsumers) {
+        this.inputConsumers = inputConsumers;
+        inputConsumers.stream()
+            .flatMap(consumer ->
+                    consumer instanceof MultiTopicsConsumerImpl
+                            ? ((MultiTopicsConsumerImpl<?>) consumer).getConsumers().stream()
+                            : Stream.of(consumer))
+            .forEach(consumer -> topicConsumers.putIfAbsent(TopicName.get(consumer.getTopic()), consumer));
+    }
+
+    @VisibleForTesting
+    Consumer<?> getConsumer(String topic, int partition) throws PulsarClientException {
+        if (inputConsumers == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        for (int i = 0; i < 2; i++) {

Review comment:
       Why is this for loop needed?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [pulsar] jerrypeng commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

Posted by GitBox <gi...@apache.org>.
jerrypeng commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630560486



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       @dlg99 In `JavaInstanceRunnable`, once you instantiate and call `open` on the PulsarSource, just also call another method called `getConsumers()` to get the list of consumers and then pass it into ContextImpl when it gets instantiated.  It is much simpler that way.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org