You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/03/15 16:26:52 UTC

[GitHub] merlimat closed pull request #1378: Standardize on input/output terminology for Pulsar Functions

merlimat closed pull request #1378: Standardize on input/output terminology for Pulsar Functions
URL: https://github.com/apache/incubator-pulsar/pull/1378
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/functions/context.py b/pulsar-client-cpp/python/functions/context.py
index 8ffa2dc01..4e1a96954 100644
--- a/pulsar-client-cpp/python/functions/context.py
+++ b/pulsar-client-cpp/python/functions/context.py
@@ -100,8 +100,8 @@ def publish(self, topic_name, message):
     pass
 
   @abstractmethod
-  def get_sink_topic(self):
-    '''Returns the sink topic of function'''
+  def get_output_topic(self):
+    '''Returns the output topic of function'''
     pass
 
   @abstractmethod
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index ca20cfe10..e15d50889 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -170,13 +170,13 @@ public void testLocalRunnerCmdYaml() throws Exception {
     @Test
     public void testCreateFunction() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
         cmd.run(new String[] {
             "create",
             "--name", fnName,
-            "--inputs", sourceTopicName,
-            "--output", sinkTopicName,
+            "--inputs", inputTopicName,
+            "--output", outputTopicName,
             "--jar", "SomeJar.jar",
             "--tenant", "sample",
             "--namespace", "ns1",
@@ -185,8 +185,8 @@ public void testCreateFunction() throws Exception {
 
         CreateFunction creater = cmd.getCreater();
         assertEquals(fnName, creater.getFunctionName());
-        assertEquals(sourceTopicName, creater.getInputs());
-        assertEquals(sinkTopicName, creater.getOutput());
+        assertEquals(inputTopicName, creater.getInputs());
+        assertEquals(outputTopicName, creater.getOutput());
 
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
 
@@ -195,13 +195,13 @@ public void testCreateFunction() throws Exception {
     @Test
     public void testCreateWithoutTenant() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
-        String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
         cmd.run(new String[] {
                 "create",
                 "--name", fnName,
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -215,13 +215,13 @@ public void testCreateWithoutTenant() throws Exception {
     @Test
     public void testCreateWithoutNamespace() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
-        String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
         cmd.run(new String[] {
                 "create",
                 "--name", fnName,
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--className", DummyFunction.class.getName(),
         });
@@ -234,12 +234,12 @@ public void testCreateWithoutNamespace() throws Exception {
 
     @Test
     public void testCreateWithoutFunctionName() throws Exception {
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
         cmd.run(new String[] {
                 "create",
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--tenant", "sample",
                 "--namespace", "ns1",
@@ -252,11 +252,11 @@ public void testCreateWithoutFunctionName() throws Exception {
     }
 
     @Test
-    public void testCreateWithoutSinkTopic() throws Exception {
-        String sourceTopicName = TEST_NAME + "-source-topic";
+    public void testCreateWithoutOutputTopic() throws Exception {
+        String inputTopicName = TEST_NAME + "-input-topic";
         cmd.run(new String[] {
                 "create",
-                "--inputs", sourceTopicName,
+                "--inputs", inputTopicName,
                 "--jar", "SomeJar.jar",
                 "--tenant", "sample",
                 "--namespace", "ns1",
@@ -264,7 +264,7 @@ public void testCreateWithoutSinkTopic() throws Exception {
         });
 
         CreateFunction creater = cmd.getCreater();
-        assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
+        assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
@@ -313,16 +313,16 @@ public void testDeleteFunction() throws Exception {
     @Test
     public void testUpdateFunction() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
 
 
 
         cmd.run(new String[] {
             "update",
             "--name", fnName,
-            "--inputs", sourceTopicName,
-            "--output", sinkTopicName,
+            "--inputs", inputTopicName,
+            "--output", outputTopicName,
             "--jar", "SomeJar.jar",
             "--tenant", "sample",
             "--namespace", "ns1",
@@ -331,8 +331,8 @@ public void testUpdateFunction() throws Exception {
 
         UpdateFunction updater = cmd.getUpdater();
         assertEquals(fnName, updater.getFunctionName());
-        assertEquals(sourceTopicName, updater.getInputs());
-        assertEquals(sinkTopicName, updater.getOutput());
+        assertEquals(inputTopicName, updater.getInputs());
+        assertEquals(outputTopicName, updater.getOutput());
 
         verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
     }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index fb641412c..871496223 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -44,16 +44,16 @@
     String getTopicName();
 
     /**
-     * Get a list of all source topics
-     * @return a list of all source topics
+     * Get a list of all input topics
+     * @return a list of all input topics
      */
-    Collection<String> getSourceTopics();
+    Collection<String> getInputTopics();
 
     /**
-     * Get sink topic of function
-     * @return sink topic name
+     * Get the output topic of the function
+     * @return output topic name
      */
-    String getSinkTopic();
+    String getOutputTopic();
 
     /**
      * Get output Serde class
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index a3a8b4978..378106ed4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -86,13 +86,13 @@ public void update(double value) {
     private ProducerConfiguration producerConfiguration;
     private PulsarClient pulsarClient;
     private ClassLoader classLoader;
-    private Map<String, Consumer> sourceConsumers;
+    private Map<String, Consumer> inputConsumers;
     @Getter
     @Setter
     private StateContextImpl stateContext;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
-                       ClassLoader classLoader, Map<String, Consumer> sourceConsumers) {
+                       ClassLoader classLoader, Map<String, Consumer> inputConsumers) {
         this.config = config;
         this.logger = logger;
         this.pulsarClient = client;
@@ -100,7 +100,7 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
-        this.sourceConsumers = sourceConsumers;
+        this.inputConsumers = inputConsumers;
         producerConfiguration = new ProducerConfiguration();
         producerConfiguration.setBlockIfQueueFull(true);
         producerConfiguration.setBatchingEnabled(true);
@@ -124,12 +124,12 @@ public String getTopicName() {
     }
 
     @Override
-    public Collection<String> getSourceTopics() {
-        return sourceConsumers.keySet();
+    public Collection<String> getInputTopics() {
+        return inputConsumers.keySet();
     }
 
     @Override
-    public String getSinkTopic() {
+    public String getOutputTopic() {
         return config.getFunctionConfig().getOutput();
     }
 
@@ -236,7 +236,7 @@ public void incrCounter(String key, long amount) {
 
     @Override
     public CompletableFuture<Void> ack(byte[] messageId, String topic) {
-        if (!sourceConsumers.containsKey(topic)) {
+        if (!inputConsumers.containsKey(topic)) {
             throw new RuntimeException("No such input topic " + topic);
         }
 
@@ -246,7 +246,7 @@ public void incrCounter(String key, long amount) {
         } catch (IOException e) {
             throw new RuntimeException("Invalid message id to ack", e);
         }
-        return sourceConsumers.get(topic).acknowledgeAsync(actualMessageId);
+        return inputConsumers.get(topic).acknowledgeAsync(actualMessageId);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 14f317123..0f1b3dad7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -48,11 +48,11 @@
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
                  PulsarClient pulsarClient,
-                 Map<String, Consumer> sourceConsumers) {
+                 Map<String, Consumer> inputConsumers) {
         // TODO: cache logger instances by functions?
         Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());
 
-        this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);
+        this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumers);
 
         // create the functions
         if (userClassObject instanceof Function) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fffe8f56d..3a634d4e4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -60,9 +60,9 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -82,13 +82,13 @@
     private final LinkedBlockingDeque<InputMessage> queue;
     private final String jarFile;
 
-    // source topic consumer & sink topic producer
+    // input topic consumer & output topic producer
     private final PulsarClientImpl client;
     @Getter(AccessLevel.PACKAGE)
-    private Producers sinkProducer;
+    private Producers outputProducer;
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, Consumer> sourceConsumers;
-    private LinkedList<String> sourceTopicsToResubscribe = null;
+    private final Map<String, Consumer> inputConsumers;
+    private LinkedList<String> inputTopicsToResubscribe = null;
 
     // provide tables for storing states
     private final String stateStorageServiceUrl;
@@ -137,7 +137,7 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.sourceConsumers = Maps.newConcurrentMap();
+        this.inputConsumers = Maps.newConcurrentMap();
     }
 
     private SubscriptionType getSubscriptionType() {
@@ -188,12 +188,12 @@ JavaInstance setupJavaInstance() throws Exception {
 
         // start the state table
         setupStateTable();
-        // start the sink producer
-        startSinkProducer();
-        // start the source consumer
-        startSourceConsumers();
+        // start the output producer
+        startOutputProducer();
+        // start the input consumer
+        startInputConsumer();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, sourceConsumers);
+        return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
     }
 
     /**
@@ -222,23 +222,23 @@ public void run() {
                 if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) {
                     // if the messages are received from old consumers, we discard it since new consumer was
                     // re-created for the correctness of effectively-once
-                    if (msg.getConsumer() != sourceConsumers.get(msg.getTopicName())) {
+                    if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) {
                         continue;
                     }
                 }
 
-                if (null != sinkProducer) {
+                if (null != outputProducer) {
                     // before processing the message, we have a producer connection setup for producing results.
                     Producer producer = null;
                     while (null == producer) {
                         try {
-                            producer = sinkProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
+                            producer = outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
                         } catch (PulsarClientException e) {
                             // `ProducerBusy` is thrown when an producer with same name is still connected.
-                            // This can happen when a active consumer is changed for a given source topic partition
+                            // This can happen when a active consumer is changed for a given input topic partition
                             // so we need to wait until the old active consumer release the produce connection.
                             if (!(e instanceof ProducerBusyException)) {
-                                log.error("Failed to get a producer for producing results computed from source topic {}",
+                                log.error("Failed to get a producer for producing results computed from input topic {}",
                                     msg.getTopicName());
                             }
                             TimeUnit.MILLISECONDS.sleep(500);
@@ -318,9 +318,9 @@ private void loadJars() throws Exception {
     public void becameActive(Consumer consumer, int partitionId) {
         // if the instance becomes active for a given topic partition,
         // open a producer for the results computed from this topic partition.
-        if (null != sinkProducer) {
+        if (null != outputProducer) {
             try {
-                this.sinkProducer.getProducer(consumer.getTopic(), partitionId);
+                this.outputProducer.getProducer(consumer.getTopic(), partitionId);
             } catch (PulsarClientException e) {
                 // this can be ignored, because producer can be lazily created when accessing it.
                 log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
@@ -331,10 +331,10 @@ public void becameActive(Consumer consumer, int partitionId) {
 
     @Override
     public void becameInactive(Consumer consumer, int partitionId) {
-        if (null != sinkProducer) {
+        if (null != outputProducer) {
             // if I lost the ownership of a partition, close its corresponding topic partition.
             // this is to allow the new active consumer be able to produce to the result topic.
-            this.sinkProducer.closeProducer(consumer.getTopic(), partitionId);
+            this.outputProducer.closeProducer(consumer.getTopic(), partitionId);
         }
     }
 
@@ -380,33 +380,33 @@ private void setupStateTable() throws Exception {
         this.stateTable = result(storageClient.openTable(tableName));
     }
 
-    private void startSinkProducer() throws Exception {
+    private void startOutputProducer() throws Exception {
         if (instanceConfig.getFunctionConfig().getOutput() != null
                 && !instanceConfig.getFunctionConfig().getOutput().isEmpty()
                 && this.outputSerDe != null) {
-            log.info("Starting Producer for Sink Topic " + instanceConfig.getFunctionConfig().getOutput());
+            log.info("Starting producer for output topic " + instanceConfig.getFunctionConfig().getOutput());
 
             if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                this.sinkProducer = new MultiConsumersOneSinkTopicProducers(
+                this.outputProducer = new MultiConsumersOneOuputTopicProducers(
                     client, instanceConfig.getFunctionConfig().getOutput());
             } else {
-                this.sinkProducer = new SimpleOneSinkTopicProducers(
+                this.outputProducer = new SimpleOneOuputTopicProducers(
                     client, instanceConfig.getFunctionConfig().getOutput());
             }
-            this.sinkProducer.initialize();
+            this.outputProducer.initialize();
         }
     }
 
-    private void startSourceConsumers() throws Exception {
+    private void startInputConsumer() throws Exception {
         log.info("Consumer map {}", instanceConfig.getFunctionConfig());
         for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
             ConsumerConfiguration conf = createConsumerConfiguration(entry.getKey());
-            this.sourceConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
+            this.inputConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
                     FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
         }
         for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
             ConsumerConfiguration conf = createConsumerConfiguration(topicName);
-            this.sourceConsumers.put(topicName, client.subscribe(topicName,
+            this.inputConsumers.put(topicName, client.subscribe(topicName,
                     FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
         }
     }
@@ -457,7 +457,7 @@ private void processResult(InputMessage msg,
             handleProcessException(msg.getTopicName());
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && sinkProducer != null) {
+            if (result.getResult() != null && outputProducer != null) {
                 byte[] output;
                 try {
                     output = outputSerDe.serialize(result.getResult());
@@ -504,9 +504,9 @@ private void sendOutputMessage(InputMessage srcMsg,
         }
         Producer producer;
         try {
-            producer = sinkProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
+            producer = outputProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
         } catch (PulsarClientException e) {
-            log.error("Failed to get a producer for producing results computed from source topic {}",
+            log.error("Failed to get a producer for producing results computed from input topic {}",
                 srcMsg.getTopicName());
 
             // if we fail to get a producer, put this message back to queue and reprocess it.
@@ -522,7 +522,7 @@ private void sendOutputMessage(InputMessage srcMsg,
                 }
             })
             .exceptionally(cause -> {
-                log.error("Failed to send the process result {} of message {} to sink topic {}",
+                log.error("Failed to send the process result {} of message {} to output topic {}",
                     result, srcMsg, instanceConfig.getFunctionConfig().getOutput(), cause);
                 handleProcessException(srcMsg.getTopicName());
                 return null;
@@ -549,17 +549,17 @@ private void handleProcessException(String srcTopic) {
     }
 
     private synchronized void addTopicToResubscribeList(String topicName) {
-        if (null == sourceTopicsToResubscribe) {
-            sourceTopicsToResubscribe = new LinkedList<>();
+        if (null == inputTopicsToResubscribe) {
+            inputTopicsToResubscribe = new LinkedList<>();
         }
-        sourceTopicsToResubscribe.add(topicName);
+        inputTopicsToResubscribe.add(topicName);
     }
 
     private void resubscribeTopicsIfNeeded() {
         List<String> topicsToResubscribe;
         synchronized (this) {
-            topicsToResubscribe = sourceTopicsToResubscribe;
-            sourceTopicsToResubscribe = null;
+            topicsToResubscribe = inputTopicsToResubscribe;
+            inputTopicsToResubscribe = null;
         }
         if (null != topicsToResubscribe) {
             for (String topic : topicsToResubscribe) {
@@ -571,7 +571,7 @@ private void resubscribeTopicsIfNeeded() {
     private void resubscribe(String srcTopic) {
         // if we can not produce a message to output topic, then close the consumer of the src topic
         // and retry to instantiate a consumer again.
-        Consumer consumer = sourceConsumers.remove(srcTopic);
+        Consumer consumer = inputConsumers.remove(srcTopic);
         if (consumer != null) {
             // TODO (sijie): currently we have to close the entire consumer for a given topic. However
             //               ideally we should do this in a finer granularity - we can close consumer
@@ -579,14 +579,14 @@ private void resubscribe(String srcTopic) {
             try {
                 consumer.close();
             } catch (PulsarClientException e) {
-                log.error("Failed to close consumer for source topic {} when handling produce exceptions",
+                log.error("Failed to close consumer for input topic {} when handling produce exceptions",
                     srcTopic, e);
             }
         }
         // subscribe to the src topic again
         ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
         try {
-            sourceConsumers.put(
+            inputConsumers.put(
                 srcTopic,
                 client.subscribe(
                     srcTopic,
@@ -594,7 +594,7 @@ private void resubscribe(String srcTopic) {
                     conf
                 ));
         } catch (PulsarClientException e) {
-            log.error("Failed to resubscribe to source topic {}. Added it to retry list and retry it later",
+            log.error("Failed to resubscribe to input topic {}. Added it to retry list and retry it later",
                 srcTopic, e);
             addTopicToResubscribeList(srcTopic);
         }
@@ -607,19 +607,19 @@ public void close() {
         }
         running = false;
         // stop the consumer first, so no more messages are coming in
-        sourceConsumers.forEach((k, v) -> {
+        inputConsumers.forEach((k, v) -> {
             try {
                 v.close();
             } catch (PulsarClientException e) {
-                log.warn("Failed to close consumer to source topic {}", k, e);
+                log.warn("Failed to close consumer to input topic {}", k, e);
             }
         });
-        sourceConsumers.clear();
+        inputConsumers.clear();
 
         // kill the result producer
-        if (null != sinkProducer) {
-            sinkProducer.close();
-            sinkProducer = null;
+        if (null != outputProducer) {
+            outputProducer.close();
+            outputProducer = null;
         }
 
         // kill the state table
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
similarity index 91%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index f6dd83d87..723529d51 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -28,17 +28,17 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-abstract class AbstractOneSinkTopicProducers implements Producers {
+abstract class AbstractOneOuputTopicProducers implements Producers {
 
     protected final PulsarClient client;
-    protected final String sinkTopic;
+    protected final String outputTopic;
     protected final ProducerConfiguration conf;
 
-    AbstractOneSinkTopicProducers(PulsarClient client,
-                                  String sinkTopic)
+    AbstractOneOuputTopicProducers(PulsarClient client,
+                                   String outputTopic)
             throws PulsarClientException {
         this.client = client;
-        this.sinkTopic = sinkTopic;
+        this.outputTopic = outputTopic;
         this.conf = newProducerConfiguration();
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
similarity index 88%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 63eb51e57..cf21a042b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -34,15 +34,15 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 
 @Slf4j
-public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
     private final Map<String, IntObjectMap<Producer>> producers;
 
-    public MultiConsumersOneSinkTopicProducers(PulsarClient client,
-                                               String sinkTopic)
+    public MultiConsumersOneOuputTopicProducers(PulsarClient client,
+                                                String outputTopic)
             throws PulsarClientException {
-        super(client, sinkTopic);
+        super(client, outputTopic);
         this.producers = new ConcurrentHashMap<>();
     }
 
@@ -65,7 +65,7 @@ public synchronized Producer getProducer(String srcTopicName, int srcTopicPartit
 
         Producer producer = producerMap.get(srcTopicPartition);
         if (null == producer) {
-            producer = createProducer(sinkTopic, makeProducerName(srcTopicName, srcTopicPartition));
+            producer = createProducer(outputTopic, makeProducerName(srcTopicName, srcTopicPartition));
             producerMap.put(srcTopicPartition, producer);
         }
         return producer;
@@ -97,7 +97,7 @@ public synchronized void close() {
         try {
             FutureUtils.result(FutureUtils.collect(closeFutures));
         } catch (Exception e) {
-            log.warn("Fail to close all the producers for sink topic {}", sinkTopic, e);
+            log.warn("Fail to close all the producers for output topic {}", outputTopic, e);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
similarity index 82%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
index dc8b121ae..2fee37a4f 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
@@ -27,18 +27,18 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 
 @Slf4j
-public class SimpleOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class SimpleOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
 
     @Getter
     private Producer producer;
 
-    public SimpleOneSinkTopicProducers(PulsarClient client, String sinkTopic) throws PulsarClientException {
-        super(client, sinkTopic);
+    public SimpleOneOuputTopicProducers(PulsarClient client, String outputTopic) throws PulsarClientException {
+        super(client, outputTopic);
     }
 
     @Override
     public void initialize() throws PulsarClientException {
-        producer = createProducer(sinkTopic);
+        producer = createProducer(outputTopic);
     }
 
     @Override
@@ -57,7 +57,7 @@ public void close() {
         try {
             producer.close();
         } catch (PulsarClientException e) {
-            log.warn("Fail to close producer of topic {}", sinkTopic, e);
+            log.warn("Fail to close producer of topic {}", outputTopic, e);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 6a3a5dae2..9c77fd1c3 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -98,7 +98,7 @@ def record_metric(self, metric_name, metric_value):
       self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
     self.accumulated_metrics[metric_name].update(metric_value)
 
-  def get_sink_topic(self):
+  def get_output_topic(self):
     return self.instance_config.function_config.output
 
   def get_output_serde_class_name(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0bf02b072..87ba54910 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -57,10 +57,10 @@ def main():
   parser.add_argument('--name', required=True, help='Function Name')
   parser.add_argument('--tenant', required=True, help='Tenant Name')
   parser.add_argument('--namespace', required=True, help='Namespace name')
-  parser.add_argument('--custom_serde_source_topics', required=False, help='Source Topics Requiring Custom Deserialization')
+  parser.add_argument('--custom_serde_input_topics', required=False, help='Input Topics Requiring Custom Deserialization')
   parser.add_argument('--custom_serde_classnames', required=False, help='Input Serde Classnames')
-  parser.add_argument('--source_topics', required=False, help='Input topics with default serde')
-  parser.add_argument('--sink_topic', required=False, help='Sink Topic')
+  parser.add_argument('--input_topics', required=False, help='Input topics with default serde')
+  parser.add_argument('--output_topic', required=False, help='Output Topic')
   parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
@@ -87,22 +87,22 @@ def main():
   function_config.namespace = args.namespace
   function_config.name = args.name
   function_config.className = args.function_classname
-  if args.custom_serde_source_topics is None and args.source_topics is None:
-    Log.critical("Atleast one source topic must be present")
+  if args.custom_serde_input_topics is None and args.input_topics is None:
+    Log.critical("Atleast one input topic must be present")
     sys.exit(1)
-  if args.custom_serde_source_topics is not None and args.custom_serde_classnames is not None:
-    source_topics = args.custom_serde_source_topics.split(",")
-    source_serde = args.custom_serde_classnames.split(",")
-    if len(source_topics) != len(source_serde):
-      Log.critical("CustomSerde SourceTopics and Serde classnames should match")
+  if args.custom_serde_input_topics is not None and args.custom_serde_classnames is not None:
+    input_topics = args.custom_serde_input_topics.split(",")
+    input_serde = args.custom_serde_classnames.split(",")
+    if len(input_topics) != len(input_serde):
+      Log.critical("CustomSerde InputTopcis and Serde classnames should match")
       sys.exit(1)
-    for i in xrange(len(source_topics)):
-      function_config.customSerdeInputs[source_topics[i]] = source_serde[i]
-  if args.source_topics is not None:
-    for topic in args.source_topics.split(","):
+    for i in xrange(len(input_topics)):
+      function_config.customSerdeInputs[input_topics[i]] = input_serde[i]
+  if args.input_topics is not None:
+    for topic in args.input_topics.split(","):
       function_config.inputs.append(topic)
-  if args.sink_topic != None and len(args.sink_topic) != 0:
-    function_config.output = args.sink_topic
+  if args.output_topic != None and len(args.output_topic) != 0:
+    function_config.output = args.output_topic
   if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
     function_config.outputSerdeClassName = args.output_serde_classname
   function_config.processingGuarantees = Function_pb2.FunctionConfig.ProcessingGuarantees.Value(args.processing_guarantees)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index e8e761f13..0b2c0b6d4 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -81,7 +81,7 @@
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
@@ -231,7 +231,7 @@ public void setup() throws Exception {
             .setClassName(TestFunction.class.getName())
             .addInputs("test-src-topic")
             .setName("test-function")
-            .setOutput("test-sink-topic")
+            .setOutput("test-output-topic")
             .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
             .setTenant("test-tenant")
             .setNamespace("test-namespace")
@@ -430,15 +430,15 @@ public void testSetupJavaInstance() throws Exception {
         }
 
         // 3. verify producers and consumers are setup
-        Producers producers = runnable.getSinkProducer();
-        assertTrue(producers instanceof SimpleOneSinkTopicProducers);
+        Producers producers = runnable.getOutputProducer();
+        assertTrue(producers instanceof SimpleOneOuputTopicProducers);
         assertSame(mockProducers.get(Pair.of(
             fnConfig.getOutput(),
             null
-        )).getProducer(), ((SimpleOneSinkTopicProducers) producers).getProducer());
+        )).getProducer(), ((SimpleOneOuputTopicProducers) producers).getProducer());
 
-        assertEquals(mockConsumers.size(), runnable.getSourceConsumers().size());
-        for (Map.Entry<String, Consumer> consumerEntry : runnable.getSourceConsumers().entrySet()) {
+        assertEquals(mockConsumers.size(), runnable.getInputConsumers().size());
+        for (Map.Entry<String, Consumer> consumerEntry : runnable.getInputConsumers().entrySet()) {
             String topic = consumerEntry.getKey();
 
             Consumer mockConsumer = mockConsumers.get(Pair.of(
@@ -457,7 +457,7 @@ public void testSetupJavaInstance() throws Exception {
         for (ConsumerInstance consumer : mockConsumers.values()) {
             verify(consumer.getConsumer(), times(1)).close();
         }
-        assertTrue(runnable.getSourceConsumers().isEmpty());
+        assertTrue(runnable.getInputConsumers().isEmpty());
 
         for (ProducerInstance producer : mockProducers.values()) {
             verify(producer.getProducer(), times(1)).close();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
similarity index 90%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 06de83537..577015a2c 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.instance.producers;
 
-import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers.makeProducerName;
+import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -41,15 +41,15 @@
 import org.testng.annotations.Test;
 
 /**
- * Unit test of {@link MultiConsumersOneSinkTopicProducers}.
+ * Unit test of {@link MultiConsumersOneOuputTopicProducers}.
  */
-public class MultiConsumersOneSinkTopicProducersTest {
+public class MultiConsumersOneOutputTopicProducersTest {
 
-    private static final String TEST_SINK_TOPIC = "test-sink-topic";
+    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
     private final Map<String, Producer> mockProducers = new HashMap<>();
-    private MultiConsumersOneSinkTopicProducers producers;
+    private MultiConsumersOneOuputTopicProducers producers;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -70,7 +70,7 @@ public void setup() throws Exception {
                 }
             });
 
-        producers = new MultiConsumersOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+        producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
         producers.initialize();
     }
 
@@ -97,7 +97,7 @@ public void testGetCloseProducer() throws Exception {
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
             .createProducer(
-                eq(TEST_SINK_TOPIC),
+                eq(TEST_OUTPUT_TOPIC),
                 any(ProducerConfiguration.class)
             );
         assertTrue(producers.getProducers().containsKey(srcTopic));
@@ -108,7 +108,7 @@ public void testGetCloseProducer() throws Exception {
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
             .createProducer(
-                eq(TEST_SINK_TOPIC),
+                eq(TEST_OUTPUT_TOPIC),
                 any(ProducerConfiguration.class)
             );
         assertTrue(producers.getProducers().containsKey(srcTopic));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
similarity index 82%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
index 84bc69368..67337a15f 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
@@ -34,15 +34,15 @@
 import org.testng.annotations.Test;
 
 /**
- * Unit test of {@link SimpleOneSinkTopicProducers}.
+ * Unit test of {@link SimpleOneOuputTopicProducers}.
  */
-public class SimpleOneSinkTopicProducersTest {
+public class SimpleOneOutputTopicProducersTest {
 
-    private static final String TEST_SINK_TOPIC = "test-sink-topic";
+    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
     private Producer mockProducer;
-    private SimpleOneSinkTopicProducers producers;
+    private SimpleOneOuputTopicProducers producers;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -52,7 +52,7 @@ public void setup() throws Exception {
         when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class)))
             .thenReturn(mockProducer);
 
-        this.producers = new SimpleOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+        this.producers = new SimpleOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
     }
 
     @Test
@@ -60,7 +60,7 @@ public void testInitializeClose() throws Exception {
         this.producers.initialize();
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         this.producers.close();
 
@@ -77,12 +77,12 @@ public void testGetAndCloseProducer() throws Exception {
         this.producers.initialize();
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         assertSame(mockProducer, this.producers.getProducer("test-src-topic", 0));
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         producers.closeProducer("test-src-topic", 0);
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index badee9461..08bfbe4d4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -56,15 +56,15 @@
     @Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
     protected String namespace;
 
-    @Parameter(names = "--sink_topic", description = "Output Topic Name\n")
-    protected String sinkTopicName;
+    @Parameter(names = "--output_topic", description = "Output Topic Name\n")
+    protected String outputTopicName;
 
-    @Parameter(names = "--custom_serde_source_topics", description = "Input Topics that need custom deserialization\n", required = false)
-    protected String customSerdeSourceTopics;
+    @Parameter(names = "--custom_serde_input_topics", description = "Input Topics that need custom deserialization\n", required = false)
+    protected String customSerdeInputTopics;
     @Parameter(names = "--custom_serde_classnames", description = "Input SerDe\n", required = false)
     protected String customSerdeClassnames;
-    @Parameter(names = "--source_topics", description = "Input Topics\n", required = false)
-    protected String defaultSerdeSourceTopics;
+    @Parameter(names = "--input_topics", description = "Input Topics\n", required = false)
+    protected String defaultSerdeInputTopics;
 
     @Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
     protected String outputSerdeClassName;
@@ -118,27 +118,27 @@ public void start() throws Exception {
         functionConfigBuilder.setNamespace(namespace);
         functionConfigBuilder.setName(functionName);
         functionConfigBuilder.setClassName(className);
-        if (defaultSerdeSourceTopics != null) {
-            String[] sourceTopics = defaultSerdeSourceTopics.split(",");
-            for (String sourceTopic : sourceTopics) {
-                functionConfigBuilder.addInputs(sourceTopic);
+        if (defaultSerdeInputTopics != null) {
+            String[] inputTopics = defaultSerdeInputTopics.split(",");
+            for (String inputTopic : inputTopics) {
+                functionConfigBuilder.addInputs(inputTopic);
             }
         }
-        if (customSerdeSourceTopics != null && customSerdeClassnames != null) {
-            String[] sourceTopics = customSerdeSourceTopics.split(",");
+        if (customSerdeInputTopics != null && customSerdeClassnames != null) {
+            String[] inputTopics = customSerdeInputTopics.split(",");
             String[] inputSerdeClassNames = customSerdeClassnames.split(",");
-            if (sourceTopics.length != inputSerdeClassNames.length) {
+            if (inputTopics.length != inputSerdeClassNames.length) {
                 throw new RuntimeException("Error specifying inputs");
             }
-            for (int i = 0; i < sourceTopics.length; ++i) {
-                functionConfigBuilder.putCustomSerdeInputs(sourceTopics[i], inputSerdeClassNames[i]);
+            for (int i = 0; i < inputTopics.length; ++i) {
+                functionConfigBuilder.putCustomSerdeInputs(inputTopics[i], inputSerdeClassNames[i]);
             }
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
         }
-        if (sinkTopicName != null) {
-            functionConfigBuilder.setOutput(sinkTopicName);
+        if (outputTopicName != null) {
+            functionConfigBuilder.setOutput(outputTopicName);
         }
         if (logTopic != null) {
             functionConfigBuilder.setLogTopic(logTopic);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c1d7b7f8f..1bdedd7c4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -112,13 +112,13 @@
             args.add(instanceConfig.getFunctionConfig().getLogTopic());
         }
         if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 0) {
-            String sourceTopicString = "";
+            String inputTopicString = "";
             String inputSerdeClassNameString = "";
             for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
-                if (sourceTopicString.isEmpty()) {
-                    sourceTopicString = entry.getKey();
+                if (inputTopicString.isEmpty()) {
+                    inputTopicString = entry.getKey();
                 } else {
-                    sourceTopicString = sourceTopicString + "," + entry.getKey();
+                    inputTopicString = inputTopicString + "," + entry.getKey();
                 }
                 if (inputSerdeClassNameString.isEmpty()) {
                     inputSerdeClassNameString = entry.getValue();
@@ -126,22 +126,22 @@
                     inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
                 }
             }
-            args.add("--custom_serde_source_topics");
-            args.add(sourceTopicString);
+            args.add("--custom_serde_input_topics");
+            args.add(inputTopicString);
             args.add("--custom_serde_classnames");
             args.add(inputSerdeClassNameString);
         }
         if (instanceConfig.getFunctionConfig().getInputsCount() > 0) {
-            String sourceTopicString = "";
+            String inputTopicString = "";
             for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
-                if (sourceTopicString.isEmpty()) {
-                    sourceTopicString = topicName;
+                if (inputTopicString.isEmpty()) {
+                    inputTopicString = topicName;
                 } else {
-                    sourceTopicString = sourceTopicString + "," + topicName;
+                    inputTopicString = inputTopicString + "," + topicName;
                 }
             }
-            args.add("--source_topics");
-            args.add(sourceTopicString);
+            args.add("--input_topics");
+            args.add(inputTopicString);
         }
         args.add("--auto_ack");
         if (instanceConfig.getFunctionConfig().getAutoAck()) {
@@ -151,7 +151,7 @@
         }
         if (instanceConfig.getFunctionConfig().getOutput() != null
                 && !instanceConfig.getFunctionConfig().getOutput().isEmpty()) {
-            args.add("--sink_topic");
+            args.add("--output_topic");
             args.add(instanceConfig.getFunctionConfig().getOutput());
         }
         if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 31e6422c1..e316e6e3c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -75,9 +75,9 @@ FunctionConfig createFunctionConfig(FunctionConfig.Runtime runtime) {
         functionConfigBuilder.setNamespace(TEST_NAMESPACE);
         functionConfigBuilder.setName(TEST_NAME);
         functionConfigBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
-        functionConfigBuilder.addInputs(TEST_NAME + "-source1");
-        functionConfigBuilder.addInputs(TEST_NAME + "-source2");
-        functionConfigBuilder.setOutput(TEST_NAME + "-sink");
+        functionConfigBuilder.addInputs(TEST_NAME + "-input1");
+        functionConfigBuilder.addInputs(TEST_NAME + "-input2");
+        functionConfigBuilder.setOutput(TEST_NAME + "-output");
         functionConfigBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
         functionConfigBuilder.setLogTopic(TEST_NAME + "-log");
         return functionConfigBuilder.build();
@@ -113,9 +113,9 @@ public void testJavaConstructor() {
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + config.getFunctionConfig().getClassName()
                 + " --log_topic " + config.getFunctionConfig().getLogTopic()
-                + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
                 + " --auto_ack false"
-                + " --sink_topic " + config.getFunctionConfig().getOutput()
+                + " --output_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
@@ -140,9 +140,9 @@ public void testPythonConstructor() {
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + config.getFunctionConfig().getClassName()
                 + " --log_topic " + config.getFunctionConfig().getLogTopic()
-                + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
                 + " --auto_ack false"
-                + " --sink_topic " + config.getFunctionConfig().getOutput()
+                + " --output_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
diff --git a/pulsar-functions/submit-python-function.sh b/pulsar-functions/submit-python-function.sh
index 708504762..ed625a5a1 100755
--- a/pulsar-functions/submit-python-function.sh
+++ b/pulsar-functions/submit-python-function.sh
@@ -21,8 +21,8 @@
 
 bin/pulsar-functions functions create \
     --function-config conf/example.yml \
-    --sink-topic persistent://sample/standalone/ns1/test_result \
-    --source-topics persistent://sample/standalone/ns1/test_src \
+    --output-topic persistent://sample/standalone/ns1/test_result \
+    --input-topics persistent://sample/standalone/ns1/test_src \
     --output-serde-classname pulsarfunction.serde.IdentitySerDe \
     --py python-examples/exclamation.py \
     --function-classname exclamation.Exclamation
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 7dabeedbe..0f182d1cc 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -86,8 +86,8 @@ public String process(String input, Context context) throws Exception {
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
-    private static final String sinkTopic = "test-sink-topic";
-    private static final String sourceTopic = "test-source-topic";
+    private static final String outputTopic = "test-output-topic";
+    private static final String inputTopic = "test-input-topic";
     private static final String inputSerdeClassName = DefaultSerDe.class.getName();
     private static final String outputSerdeClassName = DefaultSerDe.class.getName();
     private static final String className = TestFunction.class.getName();
@@ -136,8 +136,8 @@ public void testRegisterFunctionMissingTenant() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -153,8 +153,8 @@ public void testRegisterFunctionMissingNamespace() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -170,8 +170,8 @@ public void testRegisterFunctionMissingFunctionName() throws IOException {
             null,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -187,8 +187,8 @@ public void testRegisterFunctionMissingPackage() throws IOException {
             function,
             null,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -204,8 +204,8 @@ public void testRegisterFunctionMissingPackageDetails() throws IOException {
             function,
             mockedInputStream,
             null,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -214,14 +214,14 @@ public void testRegisterFunctionMissingPackageDetails() throws IOException {
     }
 
     @Test
-    public void testRegisterFunctionMissingSourceTopic() throws IOException {
+    public void testRegisterFunctionMissingInputTopic() throws IOException {
         testRegisterFunctionMissingArguments(
             tenant,
             namespace,
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
+            outputTopic,
             null,
             inputSerdeClassName,
             outputSerdeClassName,
@@ -238,8 +238,8 @@ public void testRegisterFunctionMissingInputSerde() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             null,
             outputSerdeClassName,
             className,
@@ -255,8 +255,8 @@ public void testRegisterFunctionMissingClassName() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             null,
@@ -272,8 +272,8 @@ public void testRegisterFunctionMissingParallelism() throws IOException {
                 function,
                 mockedInputStream,
                 mockedFormData,
-                sinkTopic,
-                sourceTopic,
+                outputTopic,
+                inputTopic,
                 inputSerdeClassName,
                 outputSerdeClassName,
                 className,
@@ -287,8 +287,8 @@ private void testRegisterFunctionMissingArguments(
         String function,
         InputStream inputStream,
         FormDataContentDisposition details,
-        String sinkTopic,
-        String sourceTopic,
+        String outputTopic,
+        String inputTopic,
         String inputSerdeClassName,
         String outputSerdeClassName,
         String className,
@@ -305,11 +305,11 @@ private void testRegisterFunctionMissingArguments(
         if (function != null) {
             functionConfigBuilder.setName(function);
         }
-        if (sinkTopic != null) {
-            functionConfigBuilder.setOutput(sinkTopic);
+        if (outputTopic != null) {
+            functionConfigBuilder.setOutput(outputTopic);
         }
-        if (sourceTopic != null && inputSerdeClassName != null) {
-            functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+        if (inputTopic != null && inputSerdeClassName != null) {
+            functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -341,7 +341,7 @@ private void testRegisterFunctionMissingArguments(
     private Response registerDefaultFunction() throws IOException {
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
                 .setParallelism(parallelism).build();
@@ -456,8 +456,8 @@ public void testUpdateFunctionMissingTenant() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -473,8 +473,8 @@ public void testUpdateFunctionMissingNamespace() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -490,8 +490,8 @@ public void testUpdateFunctionMissingFunctionName() throws IOException {
             null,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -507,8 +507,8 @@ public void testUpdateFunctionMissingPackage() throws IOException {
             function,
             null,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -524,8 +524,8 @@ public void testUpdateFunctionMissingPackageDetails() throws IOException {
             function,
             mockedInputStream,
             null,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -541,7 +541,7 @@ public void testUpdateFunctionMissingSourceTopic() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
+            outputTopic,
             null,
             inputSerdeClassName,
             outputSerdeClassName,
@@ -558,8 +558,8 @@ public void testUpdateFunctionMissingInputSerde() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             null,
             outputSerdeClassName,
             className,
@@ -575,8 +575,8 @@ public void testUpdateFunctionMissingClassName() throws IOException {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             null,
@@ -591,8 +591,8 @@ public void testUpdateFunctionMissingParallelism() throws IOException {
                 function,
                 mockedInputStream,
                 mockedFormData,
-                sinkTopic,
-                sourceTopic,
+                outputTopic,
+                inputTopic,
                 inputSerdeClassName,
                 outputSerdeClassName,
                 className,
@@ -607,8 +607,8 @@ private void testUpdateFunctionMissingArguments(
         String function,
         InputStream inputStream,
         FormDataContentDisposition details,
-        String sinkTopic,
-        String sourceTopic,
+        String outputTopic,
+        String inputTopic,
         String inputSerdeClassName,
         String outputSerdeClassName,
         String className,
@@ -625,11 +625,11 @@ private void testUpdateFunctionMissingArguments(
         if (function != null) {
             functionConfigBuilder.setName(function);
         }
-        if (sinkTopic != null) {
-            functionConfigBuilder.setOutput(sinkTopic);
+        if (outputTopic != null) {
+            functionConfigBuilder.setOutput(outputTopic);
         }
-        if (sourceTopic != null && inputSerdeClassName != null) {
-            functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+        if (inputTopic != null && inputSerdeClassName != null) {
+            functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -661,7 +661,7 @@ private void testUpdateFunctionMissingArguments(
     private Response updateDefaultFunction() throws IOException {
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
                 .setParallelism(parallelism).build();
@@ -935,12 +935,12 @@ public void testGetFunctionSuccess() throws Exception {
 
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setClassName(className)
-                .putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setName(function)
                 .setNamespace(namespace)
                 .setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE)
-                .setOutput(sinkTopic)
+                .setOutput(outputTopic)
                 .setTenant(tenant)
                 .setParallelism(parallelism).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services