You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/15 16:26:53 UTC

[incubator-pulsar] branch master updated: Standardize on input/output terminology for Pulsar Functions (#1378)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new dc34ab6  Standardize on input/output terminology for Pulsar Functions (#1378)
dc34ab6 is described below

commit dc34ab67b8656a92625d6d52a8c2d624802486ed
Author: Luc Perkins <lu...@gmail.com>
AuthorDate: Thu Mar 15 09:26:50 2018 -0700

    Standardize on input/output terminology for Pulsar Functions (#1378)
    
    * fix usage of 'sink' terminology
    
    * fix usages of 'source'
    
    * fix additional usages
---
 pulsar-client-cpp/python/functions/context.py      |   4 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  56 +++++------
 .../org/apache/pulsar/functions/api/Context.java   |  12 +--
 .../pulsar/functions/instance/ContextImpl.java     |  16 ++--
 .../pulsar/functions/instance/JavaInstance.java    |   4 +-
 .../functions/instance/JavaInstanceRunnable.java   |  96 +++++++++----------
 ...rs.java => AbstractOneOuputTopicProducers.java} |  10 +-
 ...a => MultiConsumersOneOuputTopicProducers.java} |  12 +--
 ...cers.java => SimpleOneOuputTopicProducers.java} |  10 +-
 .../instance/src/main/python/contextimpl.py        |   2 +-
 .../src/main/python/python_instance_main.py        |  32 +++----
 .../instance/JavaInstanceRunnableProcessTest.java  |  16 ++--
 ...MultiConsumersOneOutputTopicProducersTest.java} |  16 ++--
 ...java => SimpleOneOutputTopicProducersTest.java} |  16 ++--
 .../pulsar/functions/runtime/JavaInstanceMain.java |  34 +++----
 .../pulsar/functions/runtime/ProcessRuntime.java   |  26 ++---
 .../functions/runtime/ProcessRuntimeTest.java      |  14 +--
 pulsar-functions/submit-python-function.sh         |   4 +-
 .../rest/api/v2/FunctionApiV2ResourceTest.java     | 106 ++++++++++-----------
 19 files changed, 243 insertions(+), 243 deletions(-)

diff --git a/pulsar-client-cpp/python/functions/context.py b/pulsar-client-cpp/python/functions/context.py
index 8ffa2dc..4e1a969 100644
--- a/pulsar-client-cpp/python/functions/context.py
+++ b/pulsar-client-cpp/python/functions/context.py
@@ -100,8 +100,8 @@ class Context(object):
     pass
 
   @abstractmethod
-  def get_sink_topic(self):
-    '''Returns the sink topic of function'''
+  def get_output_topic(self):
+    '''Returns the output topic of function'''
     pass
 
   @abstractmethod
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index ca20cfe..e15d508 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -170,13 +170,13 @@ public class CmdFunctionsTest {
     @Test
     public void testCreateFunction() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
         cmd.run(new String[] {
             "create",
             "--name", fnName,
-            "--inputs", sourceTopicName,
-            "--output", sinkTopicName,
+            "--inputs", inputTopicName,
+            "--output", outputTopicName,
             "--jar", "SomeJar.jar",
             "--tenant", "sample",
             "--namespace", "ns1",
@@ -185,8 +185,8 @@ public class CmdFunctionsTest {
 
         CreateFunction creater = cmd.getCreater();
         assertEquals(fnName, creater.getFunctionName());
-        assertEquals(sourceTopicName, creater.getInputs());
-        assertEquals(sinkTopicName, creater.getOutput());
+        assertEquals(inputTopicName, creater.getInputs());
+        assertEquals(outputTopicName, creater.getOutput());
 
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
 
@@ -195,13 +195,13 @@ public class CmdFunctionsTest {
     @Test
     public void testCreateWithoutTenant() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
-        String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
         cmd.run(new String[] {
                 "create",
                 "--name", fnName,
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--namespace", "ns1",
                 "--className", DummyFunction.class.getName(),
@@ -215,13 +215,13 @@ public class CmdFunctionsTest {
     @Test
     public void testCreateWithoutNamespace() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
-        String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+        String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+        String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
         cmd.run(new String[] {
                 "create",
                 "--name", fnName,
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--className", DummyFunction.class.getName(),
         });
@@ -234,12 +234,12 @@ public class CmdFunctionsTest {
 
     @Test
     public void testCreateWithoutFunctionName() throws Exception {
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
         cmd.run(new String[] {
                 "create",
-                "--inputs", sourceTopicName,
-                "--output", sinkTopicName,
+                "--inputs", inputTopicName,
+                "--output", outputTopicName,
                 "--jar", "SomeJar.jar",
                 "--tenant", "sample",
                 "--namespace", "ns1",
@@ -252,11 +252,11 @@ public class CmdFunctionsTest {
     }
 
     @Test
-    public void testCreateWithoutSinkTopic() throws Exception {
-        String sourceTopicName = TEST_NAME + "-source-topic";
+    public void testCreateWithoutOutputTopic() throws Exception {
+        String inputTopicName = TEST_NAME + "-input-topic";
         cmd.run(new String[] {
                 "create",
-                "--inputs", sourceTopicName,
+                "--inputs", inputTopicName,
                 "--jar", "SomeJar.jar",
                 "--tenant", "sample",
                 "--namespace", "ns1",
@@ -264,7 +264,7 @@ public class CmdFunctionsTest {
         });
 
         CreateFunction creater = cmd.getCreater();
-        assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
+        assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
         verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
     }
 
@@ -313,16 +313,16 @@ public class CmdFunctionsTest {
     @Test
     public void testUpdateFunction() throws Exception {
         String fnName = TEST_NAME + "-function";
-        String sourceTopicName = TEST_NAME + "-source-topic";
-        String sinkTopicName = TEST_NAME + "-sink-topic";
+        String inputTopicName = TEST_NAME + "-input-topic";
+        String outputTopicName = TEST_NAME + "-output-topic";
 
 
 
         cmd.run(new String[] {
             "update",
             "--name", fnName,
-            "--inputs", sourceTopicName,
-            "--output", sinkTopicName,
+            "--inputs", inputTopicName,
+            "--output", outputTopicName,
             "--jar", "SomeJar.jar",
             "--tenant", "sample",
             "--namespace", "ns1",
@@ -331,8 +331,8 @@ public class CmdFunctionsTest {
 
         UpdateFunction updater = cmd.getUpdater();
         assertEquals(fnName, updater.getFunctionName());
-        assertEquals(sourceTopicName, updater.getInputs());
-        assertEquals(sinkTopicName, updater.getOutput());
+        assertEquals(inputTopicName, updater.getInputs());
+        assertEquals(outputTopicName, updater.getOutput());
 
         verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
     }
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index fb64141..8714962 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -44,16 +44,16 @@ public interface Context {
     String getTopicName();
 
     /**
-     * Get a list of all source topics
-     * @return a list of all source topics
+     * Get a list of all input topics
+     * @return a list of all input topics
      */
-    Collection<String> getSourceTopics();
+    Collection<String> getInputTopics();
 
     /**
-     * Get sink topic of function
-     * @return sink topic name
+     * Get the output topic of the function
+     * @return output topic name
      */
-    String getSinkTopic();
+    String getOutputTopic();
 
     /**
      * Get output Serde class
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index a3a8b49..378106e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -86,13 +86,13 @@ class ContextImpl implements Context {
     private ProducerConfiguration producerConfiguration;
     private PulsarClient pulsarClient;
     private ClassLoader classLoader;
-    private Map<String, Consumer> sourceConsumers;
+    private Map<String, Consumer> inputConsumers;
     @Getter
     @Setter
     private StateContextImpl stateContext;
 
     public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
-                       ClassLoader classLoader, Map<String, Consumer> sourceConsumers) {
+                       ClassLoader classLoader, Map<String, Consumer> inputConsumers) {
         this.config = config;
         this.logger = logger;
         this.pulsarClient = client;
@@ -100,7 +100,7 @@ class ContextImpl implements Context {
         this.accumulatedMetrics = new ConcurrentHashMap<>();
         this.publishProducers = new HashMap<>();
         this.publishSerializers = new HashMap<>();
-        this.sourceConsumers = sourceConsumers;
+        this.inputConsumers = inputConsumers;
         producerConfiguration = new ProducerConfiguration();
         producerConfiguration.setBlockIfQueueFull(true);
         producerConfiguration.setBatchingEnabled(true);
@@ -124,12 +124,12 @@ class ContextImpl implements Context {
     }
 
     @Override
-    public Collection<String> getSourceTopics() {
-        return sourceConsumers.keySet();
+    public Collection<String> getInputTopics() {
+        return inputConsumers.keySet();
     }
 
     @Override
-    public String getSinkTopic() {
+    public String getOutputTopic() {
         return config.getFunctionConfig().getOutput();
     }
 
@@ -236,7 +236,7 @@ class ContextImpl implements Context {
 
     @Override
     public CompletableFuture<Void> ack(byte[] messageId, String topic) {
-        if (!sourceConsumers.containsKey(topic)) {
+        if (!inputConsumers.containsKey(topic)) {
             throw new RuntimeException("No such input topic " + topic);
         }
 
@@ -246,7 +246,7 @@ class ContextImpl implements Context {
         } catch (IOException e) {
             throw new RuntimeException("Invalid message id to ack", e);
         }
-        return sourceConsumers.get(topic).acknowledgeAsync(actualMessageId);
+        return inputConsumers.get(topic).acknowledgeAsync(actualMessageId);
     }
 
     @Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 14f3171..0f1b3da 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -48,11 +48,11 @@ public class JavaInstance implements AutoCloseable {
     public JavaInstance(InstanceConfig config, Object userClassObject,
                  ClassLoader clsLoader,
                  PulsarClient pulsarClient,
-                 Map<String, Consumer> sourceConsumers) {
+                 Map<String, Consumer> inputConsumers) {
         // TODO: cache logger instances by functions?
         Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());
 
-        this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);
+        this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumers);
 
         // create the functions
         if (userClassObject instanceof Function) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fffe8f5..3a634d4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -60,9 +60,9 @@ import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuara
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Reflections;
@@ -82,13 +82,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
     private final LinkedBlockingDeque<InputMessage> queue;
     private final String jarFile;
 
-    // source topic consumer & sink topic producer
+    // input topic consumer & output topic producer
     private final PulsarClientImpl client;
     @Getter(AccessLevel.PACKAGE)
-    private Producers sinkProducer;
+    private Producers outputProducer;
     @Getter(AccessLevel.PACKAGE)
-    private final Map<String, Consumer> sourceConsumers;
-    private LinkedList<String> sourceTopicsToResubscribe = null;
+    private final Map<String, Consumer> inputConsumers;
+    private LinkedList<String> inputTopicsToResubscribe = null;
 
     // provide tables for storing states
     private final String stateStorageServiceUrl;
@@ -137,7 +137,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
-        this.sourceConsumers = Maps.newConcurrentMap();
+        this.inputConsumers = Maps.newConcurrentMap();
     }
 
     private SubscriptionType getSubscriptionType() {
@@ -188,12 +188,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
 
         // start the state table
         setupStateTable();
-        // start the sink producer
-        startSinkProducer();
-        // start the source consumer
-        startSourceConsumers();
+        // start the output producer
+        startOutputProducer();
+        // start the input consumer
+        startInputConsumer();
 
-        return new JavaInstance(instanceConfig, object, clsLoader, client, sourceConsumers);
+        return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
     }
 
     /**
@@ -222,23 +222,23 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
                 if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) {
                     // if the messages are received from old consumers, we discard it since new consumer was
                     // re-created for the correctness of effectively-once
-                    if (msg.getConsumer() != sourceConsumers.get(msg.getTopicName())) {
+                    if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) {
                         continue;
                     }
                 }
 
-                if (null != sinkProducer) {
+                if (null != outputProducer) {
                     // before processing the message, we have a producer connection setup for producing results.
                     Producer producer = null;
                     while (null == producer) {
                         try {
-                            producer = sinkProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
+                            producer = outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
                         } catch (PulsarClientException e) {
                             // `ProducerBusy` is thrown when an producer with same name is still connected.
-                            // This can happen when a active consumer is changed for a given source topic partition
+                            // This can happen when a active consumer is changed for a given input topic partition
                             // so we need to wait until the old active consumer release the produce connection.
                             if (!(e instanceof ProducerBusyException)) {
-                                log.error("Failed to get a producer for producing results computed from source topic {}",
+                                log.error("Failed to get a producer for producing results computed from input topic {}",
                                     msg.getTopicName());
                             }
                             TimeUnit.MILLISECONDS.sleep(500);
@@ -318,9 +318,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
     public void becameActive(Consumer consumer, int partitionId) {
         // if the instance becomes active for a given topic partition,
         // open a producer for the results computed from this topic partition.
-        if (null != sinkProducer) {
+        if (null != outputProducer) {
             try {
-                this.sinkProducer.getProducer(consumer.getTopic(), partitionId);
+                this.outputProducer.getProducer(consumer.getTopic(), partitionId);
             } catch (PulsarClientException e) {
                 // this can be ignored, because producer can be lazily created when accessing it.
                 log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
@@ -331,10 +331,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
 
     @Override
     public void becameInactive(Consumer consumer, int partitionId) {
-        if (null != sinkProducer) {
+        if (null != outputProducer) {
             // if I lost the ownership of a partition, close its corresponding topic partition.
             // this is to allow the new active consumer be able to produce to the result topic.
-            this.sinkProducer.closeProducer(consumer.getTopic(), partitionId);
+            this.outputProducer.closeProducer(consumer.getTopic(), partitionId);
         }
     }
 
@@ -380,33 +380,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
         this.stateTable = result(storageClient.openTable(tableName));
     }
 
-    private void startSinkProducer() throws Exception {
+    private void startOutputProducer() throws Exception {
         if (instanceConfig.getFunctionConfig().getOutput() != null
                 && !instanceConfig.getFunctionConfig().getOutput().isEmpty()
                 && this.outputSerDe != null) {
-            log.info("Starting Producer for Sink Topic " + instanceConfig.getFunctionConfig().getOutput());
+            log.info("Starting producer for output topic " + instanceConfig.getFunctionConfig().getOutput());
 
             if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
-                this.sinkProducer = new MultiConsumersOneSinkTopicProducers(
+                this.outputProducer = new MultiConsumersOneOuputTopicProducers(
                     client, instanceConfig.getFunctionConfig().getOutput());
             } else {
-                this.sinkProducer = new SimpleOneSinkTopicProducers(
+                this.outputProducer = new SimpleOneOuputTopicProducers(
                     client, instanceConfig.getFunctionConfig().getOutput());
             }
-            this.sinkProducer.initialize();
+            this.outputProducer.initialize();
         }
     }
 
-    private void startSourceConsumers() throws Exception {
+    private void startInputConsumer() throws Exception {
         log.info("Consumer map {}", instanceConfig.getFunctionConfig());
         for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
             ConsumerConfiguration conf = createConsumerConfiguration(entry.getKey());
-            this.sourceConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
+            this.inputConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
                     FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
         }
         for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
             ConsumerConfiguration conf = createConsumerConfiguration(topicName);
-            this.sourceConsumers.put(topicName, client.subscribe(topicName,
+            this.inputConsumers.put(topicName, client.subscribe(topicName,
                     FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
         }
     }
@@ -457,7 +457,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
             handleProcessException(msg.getTopicName());
         } else {
             stats.incrementSuccessfullyProcessed(endTime - startTime);
-            if (result.getResult() != null && sinkProducer != null) {
+            if (result.getResult() != null && outputProducer != null) {
                 byte[] output;
                 try {
                     output = outputSerDe.serialize(result.getResult());
@@ -504,9 +504,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
         }
         Producer producer;
         try {
-            producer = sinkProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
+            producer = outputProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
         } catch (PulsarClientException e) {
-            log.error("Failed to get a producer for producing results computed from source topic {}",
+            log.error("Failed to get a producer for producing results computed from input topic {}",
                 srcMsg.getTopicName());
 
             // if we fail to get a producer, put this message back to queue and reprocess it.
@@ -522,7 +522,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
                 }
             })
             .exceptionally(cause -> {
-                log.error("Failed to send the process result {} of message {} to sink topic {}",
+                log.error("Failed to send the process result {} of message {} to output topic {}",
                     result, srcMsg, instanceConfig.getFunctionConfig().getOutput(), cause);
                 handleProcessException(srcMsg.getTopicName());
                 return null;
@@ -549,17 +549,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
     }
 
     private synchronized void addTopicToResubscribeList(String topicName) {
-        if (null == sourceTopicsToResubscribe) {
-            sourceTopicsToResubscribe = new LinkedList<>();
+        if (null == inputTopicsToResubscribe) {
+            inputTopicsToResubscribe = new LinkedList<>();
         }
-        sourceTopicsToResubscribe.add(topicName);
+        inputTopicsToResubscribe.add(topicName);
     }
 
     private void resubscribeTopicsIfNeeded() {
         List<String> topicsToResubscribe;
         synchronized (this) {
-            topicsToResubscribe = sourceTopicsToResubscribe;
-            sourceTopicsToResubscribe = null;
+            topicsToResubscribe = inputTopicsToResubscribe;
+            inputTopicsToResubscribe = null;
         }
         if (null != topicsToResubscribe) {
             for (String topic : topicsToResubscribe) {
@@ -571,7 +571,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
     private void resubscribe(String srcTopic) {
         // if we can not produce a message to output topic, then close the consumer of the src topic
         // and retry to instantiate a consumer again.
-        Consumer consumer = sourceConsumers.remove(srcTopic);
+        Consumer consumer = inputConsumers.remove(srcTopic);
         if (consumer != null) {
             // TODO (sijie): currently we have to close the entire consumer for a given topic. However
             //               ideally we should do this in a finer granularity - we can close consumer
@@ -579,14 +579,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
             try {
                 consumer.close();
             } catch (PulsarClientException e) {
-                log.error("Failed to close consumer for source topic {} when handling produce exceptions",
+                log.error("Failed to close consumer for input topic {} when handling produce exceptions",
                     srcTopic, e);
             }
         }
         // subscribe to the src topic again
         ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
         try {
-            sourceConsumers.put(
+            inputConsumers.put(
                 srcTopic,
                 client.subscribe(
                     srcTopic,
@@ -594,7 +594,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
                     conf
                 ));
         } catch (PulsarClientException e) {
-            log.error("Failed to resubscribe to source topic {}. Added it to retry list and retry it later",
+            log.error("Failed to resubscribe to input topic {}. Added it to retry list and retry it later",
                 srcTopic, e);
             addTopicToResubscribeList(srcTopic);
         }
@@ -607,19 +607,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
         }
         running = false;
         // stop the consumer first, so no more messages are coming in
-        sourceConsumers.forEach((k, v) -> {
+        inputConsumers.forEach((k, v) -> {
             try {
                 v.close();
             } catch (PulsarClientException e) {
-                log.warn("Failed to close consumer to source topic {}", k, e);
+                log.warn("Failed to close consumer to input topic {}", k, e);
             }
         });
-        sourceConsumers.clear();
+        inputConsumers.clear();
 
         // kill the result producer
-        if (null != sinkProducer) {
-            sinkProducer.close();
-            sinkProducer = null;
+        if (null != outputProducer) {
+            outputProducer.close();
+            outputProducer = null;
         }
 
         // kill the state table
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
similarity index 91%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index f6dd83d..723529d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -28,17 +28,17 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.functions.instance.FunctionResultRouter;
 
-abstract class AbstractOneSinkTopicProducers implements Producers {
+abstract class AbstractOneOuputTopicProducers implements Producers {
 
     protected final PulsarClient client;
-    protected final String sinkTopic;
+    protected final String outputTopic;
     protected final ProducerConfiguration conf;
 
-    AbstractOneSinkTopicProducers(PulsarClient client,
-                                  String sinkTopic)
+    AbstractOneOuputTopicProducers(PulsarClient client,
+                                   String outputTopic)
             throws PulsarClientException {
         this.client = client;
-        this.sinkTopic = sinkTopic;
+        this.outputTopic = outputTopic;
         this.conf = newProducerConfiguration();
     }
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
similarity index 88%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 63eb51e..cf21a04 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -34,15 +34,15 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 @Slf4j
-public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
 
     @Getter(AccessLevel.PACKAGE)
     private final Map<String, IntObjectMap<Producer>> producers;
 
-    public MultiConsumersOneSinkTopicProducers(PulsarClient client,
-                                               String sinkTopic)
+    public MultiConsumersOneOuputTopicProducers(PulsarClient client,
+                                                String outputTopic)
             throws PulsarClientException {
-        super(client, sinkTopic);
+        super(client, outputTopic);
         this.producers = new ConcurrentHashMap<>();
     }
 
@@ -65,7 +65,7 @@ public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicPro
 
         Producer producer = producerMap.get(srcTopicPartition);
         if (null == producer) {
-            producer = createProducer(sinkTopic, makeProducerName(srcTopicName, srcTopicPartition));
+            producer = createProducer(outputTopic, makeProducerName(srcTopicName, srcTopicPartition));
             producerMap.put(srcTopicPartition, producer);
         }
         return producer;
@@ -97,7 +97,7 @@ public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicPro
         try {
             FutureUtils.result(FutureUtils.collect(closeFutures));
         } catch (Exception e) {
-            log.warn("Fail to close all the producers for sink topic {}", sinkTopic, e);
+            log.warn("Fail to close all the producers for output topic {}", outputTopic, e);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
similarity index 82%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
index dc8b121..2fee37a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
@@ -27,18 +27,18 @@ import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 
 @Slf4j
-public class SimpleOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class SimpleOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
 
     @Getter
     private Producer producer;
 
-    public SimpleOneSinkTopicProducers(PulsarClient client, String sinkTopic) throws PulsarClientException {
-        super(client, sinkTopic);
+    public SimpleOneOuputTopicProducers(PulsarClient client, String outputTopic) throws PulsarClientException {
+        super(client, outputTopic);
     }
 
     @Override
     public void initialize() throws PulsarClientException {
-        producer = createProducer(sinkTopic);
+        producer = createProducer(outputTopic);
     }
 
     @Override
@@ -57,7 +57,7 @@ public class SimpleOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
         try {
             producer.close();
         } catch (PulsarClientException e) {
-            log.warn("Fail to close producer of topic {}", sinkTopic, e);
+            log.warn("Fail to close producer of topic {}", outputTopic, e);
         }
     }
 }
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 6a3a5da..9c77fd1 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -98,7 +98,7 @@ class ContextImpl(pulsar.Context):
       self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
     self.accumulated_metrics[metric_name].update(metric_value)
 
-  def get_sink_topic(self):
+  def get_output_topic(self):
     return self.instance_config.function_config.output
 
   def get_output_serde_class_name(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0bf02b0..87ba549 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -57,10 +57,10 @@ def main():
   parser.add_argument('--name', required=True, help='Function Name')
   parser.add_argument('--tenant', required=True, help='Tenant Name')
   parser.add_argument('--namespace', required=True, help='Namespace name')
-  parser.add_argument('--custom_serde_source_topics', required=False, help='Source Topics Requiring Custom Deserialization')
+  parser.add_argument('--custom_serde_input_topics', required=False, help='Input Topics Requiring Custom Deserialization')
   parser.add_argument('--custom_serde_classnames', required=False, help='Input Serde Classnames')
-  parser.add_argument('--source_topics', required=False, help='Input topics with default serde')
-  parser.add_argument('--sink_topic', required=False, help='Sink Topic')
+  parser.add_argument('--input_topics', required=False, help='Input topics with default serde')
+  parser.add_argument('--output_topic', required=False, help='Output Topic')
   parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames')
   parser.add_argument('--instance_id', required=True, help='Instance Id')
   parser.add_argument('--function_id', required=True, help='Function Id')
@@ -87,22 +87,22 @@ def main():
   function_config.namespace = args.namespace
   function_config.name = args.name
   function_config.className = args.function_classname
-  if args.custom_serde_source_topics is None and args.source_topics is None:
-    Log.critical("Atleast one source topic must be present")
+  if args.custom_serde_input_topics is None and args.input_topics is None:
+    Log.critical("Atleast one input topic must be present")
     sys.exit(1)
-  if args.custom_serde_source_topics is not None and args.custom_serde_classnames is not None:
-    source_topics = args.custom_serde_source_topics.split(",")
-    source_serde = args.custom_serde_classnames.split(",")
-    if len(source_topics) != len(source_serde):
-      Log.critical("CustomSerde SourceTopics and Serde classnames should match")
+  if args.custom_serde_input_topics is not None and args.custom_serde_classnames is not None:
+    input_topics = args.custom_serde_input_topics.split(",")
+    input_serde = args.custom_serde_classnames.split(",")
+    if len(input_topics) != len(input_serde):
+      Log.critical("CustomSerde InputTopcis and Serde classnames should match")
       sys.exit(1)
-    for i in xrange(len(source_topics)):
-      function_config.customSerdeInputs[source_topics[i]] = source_serde[i]
-  if args.source_topics is not None:
-    for topic in args.source_topics.split(","):
+    for i in xrange(len(input_topics)):
+      function_config.customSerdeInputs[input_topics[i]] = input_serde[i]
+  if args.input_topics is not None:
+    for topic in args.input_topics.split(","):
       function_config.inputs.append(topic)
-  if args.sink_topic != None and len(args.sink_topic) != 0:
-    function_config.output = args.sink_topic
+  if args.output_topic != None and len(args.output_topic) != 0:
+    function_config.output = args.output_topic
   if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
     function_config.outputSerdeClassName = args.output_serde_classname
   function_config.processingGuarantees = Function_pb2.FunctionConfig.ProcessingGuarantees.Value(args.processing_guarantees)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index e8e761f..0b2c0b6 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -81,7 +81,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuara
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.Utils;
 import org.powermock.api.mockito.PowerMockito;
@@ -231,7 +231,7 @@ public class JavaInstanceRunnableProcessTest {
             .setClassName(TestFunction.class.getName())
             .addInputs("test-src-topic")
             .setName("test-function")
-            .setOutput("test-sink-topic")
+            .setOutput("test-output-topic")
             .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
             .setTenant("test-tenant")
             .setNamespace("test-namespace")
@@ -430,15 +430,15 @@ public class JavaInstanceRunnableProcessTest {
         }
 
         // 3. verify producers and consumers are setup
-        Producers producers = runnable.getSinkProducer();
-        assertTrue(producers instanceof SimpleOneSinkTopicProducers);
+        Producers producers = runnable.getOutputProducer();
+        assertTrue(producers instanceof SimpleOneOuputTopicProducers);
         assertSame(mockProducers.get(Pair.of(
             fnConfig.getOutput(),
             null
-        )).getProducer(), ((SimpleOneSinkTopicProducers) producers).getProducer());
+        )).getProducer(), ((SimpleOneOuputTopicProducers) producers).getProducer());
 
-        assertEquals(mockConsumers.size(), runnable.getSourceConsumers().size());
-        for (Map.Entry<String, Consumer> consumerEntry : runnable.getSourceConsumers().entrySet()) {
+        assertEquals(mockConsumers.size(), runnable.getInputConsumers().size());
+        for (Map.Entry<String, Consumer> consumerEntry : runnable.getInputConsumers().entrySet()) {
             String topic = consumerEntry.getKey();
 
             Consumer mockConsumer = mockConsumers.get(Pair.of(
@@ -457,7 +457,7 @@ public class JavaInstanceRunnableProcessTest {
         for (ConsumerInstance consumer : mockConsumers.values()) {
             verify(consumer.getConsumer(), times(1)).close();
         }
-        assertTrue(runnable.getSourceConsumers().isEmpty());
+        assertTrue(runnable.getInputConsumers().isEmpty());
 
         for (ProducerInstance producer : mockProducers.values()) {
             verify(producer.getProducer(), times(1)).close();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
similarity index 90%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 06de835..577015a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.instance.producers;
 
-import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers.makeProducerName;
+import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
@@ -41,15 +41,15 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
- * Unit test of {@link MultiConsumersOneSinkTopicProducers}.
+ * Unit test of {@link MultiConsumersOneOuputTopicProducers}.
  */
-public class MultiConsumersOneSinkTopicProducersTest {
+public class MultiConsumersOneOutputTopicProducersTest {
 
-    private static final String TEST_SINK_TOPIC = "test-sink-topic";
+    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
     private final Map<String, Producer> mockProducers = new HashMap<>();
-    private MultiConsumersOneSinkTopicProducers producers;
+    private MultiConsumersOneOuputTopicProducers producers;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -70,7 +70,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
                 }
             });
 
-        producers = new MultiConsumersOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+        producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
         producers.initialize();
     }
 
@@ -97,7 +97,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
             .createProducer(
-                eq(TEST_SINK_TOPIC),
+                eq(TEST_OUTPUT_TOPIC),
                 any(ProducerConfiguration.class)
             );
         assertTrue(producers.getProducers().containsKey(srcTopic));
@@ -108,7 +108,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
         assertSame(mockProducers.get(producerName), producer);
         verify(mockClient, times(1))
             .createProducer(
-                eq(TEST_SINK_TOPIC),
+                eq(TEST_OUTPUT_TOPIC),
                 any(ProducerConfiguration.class)
             );
         assertTrue(producers.getProducers().containsKey(srcTopic));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
similarity index 82%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
index 84bc693..67337a1 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
@@ -34,15 +34,15 @@ import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 /**
- * Unit test of {@link SimpleOneSinkTopicProducers}.
+ * Unit test of {@link SimpleOneOuputTopicProducers}.
  */
-public class SimpleOneSinkTopicProducersTest {
+public class SimpleOneOutputTopicProducersTest {
 
-    private static final String TEST_SINK_TOPIC = "test-sink-topic";
+    private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
 
     private PulsarClient mockClient;
     private Producer mockProducer;
-    private SimpleOneSinkTopicProducers producers;
+    private SimpleOneOuputTopicProducers producers;
 
     @BeforeMethod
     public void setup() throws Exception {
@@ -52,7 +52,7 @@ public class SimpleOneSinkTopicProducersTest {
         when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class)))
             .thenReturn(mockProducer);
 
-        this.producers = new SimpleOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+        this.producers = new SimpleOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
     }
 
     @Test
@@ -60,7 +60,7 @@ public class SimpleOneSinkTopicProducersTest {
         this.producers.initialize();
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         this.producers.close();
 
@@ -77,12 +77,12 @@ public class SimpleOneSinkTopicProducersTest {
         this.producers.initialize();
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         assertSame(mockProducer, this.producers.getProducer("test-src-topic", 0));
 
         verify(mockClient, times(1))
-            .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+            .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
 
         producers.closeProducer("test-src-topic", 0);
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index badee94..08bfbe4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -56,15 +56,15 @@ public class JavaInstanceMain {
     @Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
     protected String namespace;
 
-    @Parameter(names = "--sink_topic", description = "Output Topic Name\n")
-    protected String sinkTopicName;
+    @Parameter(names = "--output_topic", description = "Output Topic Name\n")
+    protected String outputTopicName;
 
-    @Parameter(names = "--custom_serde_source_topics", description = "Input Topics that need custom deserialization\n", required = false)
-    protected String customSerdeSourceTopics;
+    @Parameter(names = "--custom_serde_input_topics", description = "Input Topics that need custom deserialization\n", required = false)
+    protected String customSerdeInputTopics;
     @Parameter(names = "--custom_serde_classnames", description = "Input SerDe\n", required = false)
     protected String customSerdeClassnames;
-    @Parameter(names = "--source_topics", description = "Input Topics\n", required = false)
-    protected String defaultSerdeSourceTopics;
+    @Parameter(names = "--input_topics", description = "Input Topics\n", required = false)
+    protected String defaultSerdeInputTopics;
 
     @Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
     protected String outputSerdeClassName;
@@ -118,27 +118,27 @@ public class JavaInstanceMain {
         functionConfigBuilder.setNamespace(namespace);
         functionConfigBuilder.setName(functionName);
         functionConfigBuilder.setClassName(className);
-        if (defaultSerdeSourceTopics != null) {
-            String[] sourceTopics = defaultSerdeSourceTopics.split(",");
-            for (String sourceTopic : sourceTopics) {
-                functionConfigBuilder.addInputs(sourceTopic);
+        if (defaultSerdeInputTopics != null) {
+            String[] inputTopics = defaultSerdeInputTopics.split(",");
+            for (String inputTopic : inputTopics) {
+                functionConfigBuilder.addInputs(inputTopic);
             }
         }
-        if (customSerdeSourceTopics != null && customSerdeClassnames != null) {
-            String[] sourceTopics = customSerdeSourceTopics.split(",");
+        if (customSerdeInputTopics != null && customSerdeClassnames != null) {
+            String[] inputTopics = customSerdeInputTopics.split(",");
             String[] inputSerdeClassNames = customSerdeClassnames.split(",");
-            if (sourceTopics.length != inputSerdeClassNames.length) {
+            if (inputTopics.length != inputSerdeClassNames.length) {
                 throw new RuntimeException("Error specifying inputs");
             }
-            for (int i = 0; i < sourceTopics.length; ++i) {
-                functionConfigBuilder.putCustomSerdeInputs(sourceTopics[i], inputSerdeClassNames[i]);
+            for (int i = 0; i < inputTopics.length; ++i) {
+                functionConfigBuilder.putCustomSerdeInputs(inputTopics[i], inputSerdeClassNames[i]);
             }
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
         }
-        if (sinkTopicName != null) {
-            functionConfigBuilder.setOutput(sinkTopicName);
+        if (outputTopicName != null) {
+            functionConfigBuilder.setOutput(outputTopicName);
         }
         if (logTopic != null) {
             functionConfigBuilder.setLogTopic(logTopic);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c1d7b7f..1bdedd7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -112,13 +112,13 @@ class ProcessRuntime implements Runtime {
             args.add(instanceConfig.getFunctionConfig().getLogTopic());
         }
         if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 0) {
-            String sourceTopicString = "";
+            String inputTopicString = "";
             String inputSerdeClassNameString = "";
             for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
-                if (sourceTopicString.isEmpty()) {
-                    sourceTopicString = entry.getKey();
+                if (inputTopicString.isEmpty()) {
+                    inputTopicString = entry.getKey();
                 } else {
-                    sourceTopicString = sourceTopicString + "," + entry.getKey();
+                    inputTopicString = inputTopicString + "," + entry.getKey();
                 }
                 if (inputSerdeClassNameString.isEmpty()) {
                     inputSerdeClassNameString = entry.getValue();
@@ -126,22 +126,22 @@ class ProcessRuntime implements Runtime {
                     inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
                 }
             }
-            args.add("--custom_serde_source_topics");
-            args.add(sourceTopicString);
+            args.add("--custom_serde_input_topics");
+            args.add(inputTopicString);
             args.add("--custom_serde_classnames");
             args.add(inputSerdeClassNameString);
         }
         if (instanceConfig.getFunctionConfig().getInputsCount() > 0) {
-            String sourceTopicString = "";
+            String inputTopicString = "";
             for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
-                if (sourceTopicString.isEmpty()) {
-                    sourceTopicString = topicName;
+                if (inputTopicString.isEmpty()) {
+                    inputTopicString = topicName;
                 } else {
-                    sourceTopicString = sourceTopicString + "," + topicName;
+                    inputTopicString = inputTopicString + "," + topicName;
                 }
             }
-            args.add("--source_topics");
-            args.add(sourceTopicString);
+            args.add("--input_topics");
+            args.add(inputTopicString);
         }
         args.add("--auto_ack");
         if (instanceConfig.getFunctionConfig().getAutoAck()) {
@@ -151,7 +151,7 @@ class ProcessRuntime implements Runtime {
         }
         if (instanceConfig.getFunctionConfig().getOutput() != null
                 && !instanceConfig.getFunctionConfig().getOutput().isEmpty()) {
-            args.add("--sink_topic");
+            args.add("--output_topic");
             args.add(instanceConfig.getFunctionConfig().getOutput());
         }
         if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 31e6422..e316e6e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -75,9 +75,9 @@ public class ProcessRuntimeTest {
         functionConfigBuilder.setNamespace(TEST_NAMESPACE);
         functionConfigBuilder.setName(TEST_NAME);
         functionConfigBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
-        functionConfigBuilder.addInputs(TEST_NAME + "-source1");
-        functionConfigBuilder.addInputs(TEST_NAME + "-source2");
-        functionConfigBuilder.setOutput(TEST_NAME + "-sink");
+        functionConfigBuilder.addInputs(TEST_NAME + "-input1");
+        functionConfigBuilder.addInputs(TEST_NAME + "-input2");
+        functionConfigBuilder.setOutput(TEST_NAME + "-output");
         functionConfigBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
         functionConfigBuilder.setLogTopic(TEST_NAME + "-log");
         return functionConfigBuilder.build();
@@ -113,9 +113,9 @@ public class ProcessRuntimeTest {
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + config.getFunctionConfig().getClassName()
                 + " --log_topic " + config.getFunctionConfig().getLogTopic()
-                + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
                 + " --auto_ack false"
-                + " --sink_topic " + config.getFunctionConfig().getOutput()
+                + " --output_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
@@ -140,9 +140,9 @@ public class ProcessRuntimeTest {
                 + " --name " + config.getFunctionConfig().getName()
                 + " --function_classname " + config.getFunctionConfig().getClassName()
                 + " --log_topic " + config.getFunctionConfig().getLogTopic()
-                + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+                + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
                 + " --auto_ack false"
-                + " --sink_topic " + config.getFunctionConfig().getOutput()
+                + " --output_topic " + config.getFunctionConfig().getOutput()
                 + " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
                 + " --processing_guarantees ATLEAST_ONCE"
                 + " --pulsar_serviceurl " + pulsarServiceUrl
diff --git a/pulsar-functions/submit-python-function.sh b/pulsar-functions/submit-python-function.sh
index 7085047..ed625a5 100755
--- a/pulsar-functions/submit-python-function.sh
+++ b/pulsar-functions/submit-python-function.sh
@@ -21,8 +21,8 @@
 
 bin/pulsar-functions functions create \
     --function-config conf/example.yml \
-    --sink-topic persistent://sample/standalone/ns1/test_result \
-    --source-topics persistent://sample/standalone/ns1/test_src \
+    --output-topic persistent://sample/standalone/ns1/test_result \
+    --input-topics persistent://sample/standalone/ns1/test_src \
     --output-serde-classname pulsarfunction.serde.IdentitySerDe \
     --py python-examples/exclamation.py \
     --function-classname exclamation.Exclamation
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 7dabeed..0f182d1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -86,8 +86,8 @@ public class FunctionApiV2ResourceTest {
     private static final String tenant = "test-tenant";
     private static final String namespace = "test-namespace";
     private static final String function = "test-function";
-    private static final String sinkTopic = "test-sink-topic";
-    private static final String sourceTopic = "test-source-topic";
+    private static final String outputTopic = "test-output-topic";
+    private static final String inputTopic = "test-input-topic";
     private static final String inputSerdeClassName = DefaultSerDe.class.getName();
     private static final String outputSerdeClassName = DefaultSerDe.class.getName();
     private static final String className = TestFunction.class.getName();
@@ -136,8 +136,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -153,8 +153,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -170,8 +170,8 @@ public class FunctionApiV2ResourceTest {
             null,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -187,8 +187,8 @@ public class FunctionApiV2ResourceTest {
             function,
             null,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -204,8 +204,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             null,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -214,14 +214,14 @@ public class FunctionApiV2ResourceTest {
     }
 
     @Test
-    public void testRegisterFunctionMissingSourceTopic() throws IOException {
+    public void testRegisterFunctionMissingInputTopic() throws IOException {
         testRegisterFunctionMissingArguments(
             tenant,
             namespace,
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
+            outputTopic,
             null,
             inputSerdeClassName,
             outputSerdeClassName,
@@ -238,8 +238,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             null,
             outputSerdeClassName,
             className,
@@ -255,8 +255,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             null,
@@ -272,8 +272,8 @@ public class FunctionApiV2ResourceTest {
                 function,
                 mockedInputStream,
                 mockedFormData,
-                sinkTopic,
-                sourceTopic,
+                outputTopic,
+                inputTopic,
                 inputSerdeClassName,
                 outputSerdeClassName,
                 className,
@@ -287,8 +287,8 @@ public class FunctionApiV2ResourceTest {
         String function,
         InputStream inputStream,
         FormDataContentDisposition details,
-        String sinkTopic,
-        String sourceTopic,
+        String outputTopic,
+        String inputTopic,
         String inputSerdeClassName,
         String outputSerdeClassName,
         String className,
@@ -305,11 +305,11 @@ public class FunctionApiV2ResourceTest {
         if (function != null) {
             functionConfigBuilder.setName(function);
         }
-        if (sinkTopic != null) {
-            functionConfigBuilder.setOutput(sinkTopic);
+        if (outputTopic != null) {
+            functionConfigBuilder.setOutput(outputTopic);
         }
-        if (sourceTopic != null && inputSerdeClassName != null) {
-            functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+        if (inputTopic != null && inputSerdeClassName != null) {
+            functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -341,7 +341,7 @@ public class FunctionApiV2ResourceTest {
     private Response registerDefaultFunction() throws IOException {
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
                 .setParallelism(parallelism).build();
@@ -456,8 +456,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -473,8 +473,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -490,8 +490,8 @@ public class FunctionApiV2ResourceTest {
             null,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -507,8 +507,8 @@ public class FunctionApiV2ResourceTest {
             function,
             null,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -524,8 +524,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             null,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             className,
@@ -541,7 +541,7 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
+            outputTopic,
             null,
             inputSerdeClassName,
             outputSerdeClassName,
@@ -558,8 +558,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             null,
             outputSerdeClassName,
             className,
@@ -575,8 +575,8 @@ public class FunctionApiV2ResourceTest {
             function,
             mockedInputStream,
             mockedFormData,
-            sinkTopic,
-            sourceTopic,
+            outputTopic,
+            inputTopic,
             inputSerdeClassName,
             outputSerdeClassName,
             null,
@@ -591,8 +591,8 @@ public class FunctionApiV2ResourceTest {
                 function,
                 mockedInputStream,
                 mockedFormData,
-                sinkTopic,
-                sourceTopic,
+                outputTopic,
+                inputTopic,
                 inputSerdeClassName,
                 outputSerdeClassName,
                 className,
@@ -607,8 +607,8 @@ public class FunctionApiV2ResourceTest {
         String function,
         InputStream inputStream,
         FormDataContentDisposition details,
-        String sinkTopic,
-        String sourceTopic,
+        String outputTopic,
+        String inputTopic,
         String inputSerdeClassName,
         String outputSerdeClassName,
         String className,
@@ -625,11 +625,11 @@ public class FunctionApiV2ResourceTest {
         if (function != null) {
             functionConfigBuilder.setName(function);
         }
-        if (sinkTopic != null) {
-            functionConfigBuilder.setOutput(sinkTopic);
+        if (outputTopic != null) {
+            functionConfigBuilder.setOutput(outputTopic);
         }
-        if (sourceTopic != null && inputSerdeClassName != null) {
-            functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+        if (inputTopic != null && inputSerdeClassName != null) {
+            functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
         }
         if (outputSerdeClassName != null) {
             functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -661,7 +661,7 @@ public class FunctionApiV2ResourceTest {
     private Response updateDefaultFunction() throws IOException {
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setTenant(tenant).setNamespace(namespace).setName(function)
-                .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setClassName(className)
                 .setParallelism(parallelism).build();
@@ -935,12 +935,12 @@ public class FunctionApiV2ResourceTest {
 
         FunctionConfig functionConfig = FunctionConfig.newBuilder()
                 .setClassName(className)
-                .putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+                .putCustomSerdeInputs(inputTopic, inputSerdeClassName)
                 .setOutputSerdeClassName(outputSerdeClassName)
                 .setName(function)
                 .setNamespace(namespace)
                 .setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE)
-                .setOutput(sinkTopic)
+                .setOutput(outputTopic)
                 .setTenant(tenant)
                 .setParallelism(parallelism).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.