You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2021/05/11 11:40:20 UTC

[GitHub] [pulsar] eolivelli commented on a change in pull request #10498: SinkContext: ability to seek/pause/resume consumer for a topic

eolivelli commented on a change in pull request #10498:
URL: https://github.com/apache/pulsar/pull/10498#discussion_r630086497



##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/MultiConsumerPulsarSource.java
##########
@@ -69,7 +72,20 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
             cb.messageListener(this);
 
             Consumer<T> consumer = cb.subscribeAsync().join();
-            inputConsumers.add(consumer);
+            inputConsumers.put(TopicName.get(topic), consumer);
+        }
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName req = TopicName.get(topicName);
+                    if (inputConsumers.containsKey(req)) {
+                        return inputConsumers.get(req);
+                    }
+                } catch (Exception e) {
+                    return null;

Review comment:
       can we rethrow the exception  ?
   can we catch a specific exception type ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/SingleConsumerPulsarSource.java
##########
@@ -71,6 +73,19 @@ public void open(Map<String, Object> config, SourceContext sourceContext) throws
 
         ConsumerBuilder<T> cb = createConsumeBuilder(topic, pulsarSourceConsumerConfig);
         consumer = cb.subscribeAsync().join();
+        if (sourceContext instanceof ExtendedSourceContext) {
+            ((ExtendedSourceContext) sourceContext).setConsumerGetter(topicName -> {
+                try {
+                    TopicName src = TopicName.get(topic);
+                    if (src.equals(TopicName.get(topicName))) {
+                        return consumer;
+                    }
+                } catch (Exception e) {

Review comment:
       can we rethrow the exception ?
   can we catch a specific exception type ?

##########
File path: pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
##########
@@ -706,4 +710,41 @@ public void close() {
             logger.warn("Failed to close producers", e);
         }
     }
+
+    @Override
+    public void seek(String topic, int partition, MessageId messageId) throws PulsarClientException {
+        Consumer<?> consumer = getConsumer(topic);
+        final MessageId msgId;
+        if (partition == 0) {
+            msgId = messageId;
+        } else {
+            TopicName topicName = TopicName.get(topic);
+            msgId = new TopicMessageIdImpl(
+                    topicName.getPartition(partition).toString(), topicName.toString(), messageId);
+        }
+        consumer.seek(msgId);
+    }
+
+    @Override
+    public void pause(String topic) throws PulsarClientException {
+        getConsumer(topic).pause();
+    }
+
+    @Override
+    public void resume(String topic) throws PulsarClientException {
+        getConsumer(topic).resume();
+    }
+
+    @Override
+    public void setConsumerGetter(java.util.function.Function<String, Consumer<?>> getConsumerFunc) {
+        this.getConsumerFunc = getConsumerFunc;
+    }
+
+    private Consumer<?> getConsumer(String topic) throws PulsarClientException {
+        if (getConsumerFunc == null) {
+            throw new PulsarClientException("Getting consumer is not supported");
+        }
+        return getConsumerFunc.apply(topic);

Review comment:
       this function may return `null`, probably it is better to throw an Exception here,
   otherwise we have to handle null` `in every other function: pause/resume...




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org