You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/10 23:23:52 UTC

[GitHub] [pulsar] dlg99 commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

dlg99 commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r629744063



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {

Review comment:
       One way or another the PulsarSource will need to set data to the context. 
   In this case we either make pulsar source depend on concrete ContextImpl, or extend SourceContext with a setter that is not needed in actual Source, or add the interface that extends SourceContext to not make that method a part of the public API.
   Whether it sets map of consumers or a func that resolves it is a matter of choice. I preferred the func to keep topic comparison logic in the PulsarSource plus case PulsarSource ever decides to create consumers dynamically/after open()




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org