You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/03/15 16:26:53 UTC
[incubator-pulsar] branch master updated: Standardize on
input/output terminology for Pulsar Functions (#1378)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new dc34ab6 Standardize on input/output terminology for Pulsar Functions (#1378)
dc34ab6 is described below
commit dc34ab67b8656a92625d6d52a8c2d624802486ed
Author: Luc Perkins <lu...@gmail.com>
AuthorDate: Thu Mar 15 09:26:50 2018 -0700
Standardize on input/output terminology for Pulsar Functions (#1378)
* fix usage of 'sink' terminology
* fix usages of 'source'
* fix additional usages
---
pulsar-client-cpp/python/functions/context.py | 4 +-
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 56 +++++------
.../org/apache/pulsar/functions/api/Context.java | 12 +--
.../pulsar/functions/instance/ContextImpl.java | 16 ++--
.../pulsar/functions/instance/JavaInstance.java | 4 +-
.../functions/instance/JavaInstanceRunnable.java | 96 +++++++++----------
...rs.java => AbstractOneOuputTopicProducers.java} | 10 +-
...a => MultiConsumersOneOuputTopicProducers.java} | 12 +--
...cers.java => SimpleOneOuputTopicProducers.java} | 10 +-
.../instance/src/main/python/contextimpl.py | 2 +-
.../src/main/python/python_instance_main.py | 32 +++----
.../instance/JavaInstanceRunnableProcessTest.java | 16 ++--
...MultiConsumersOneOutputTopicProducersTest.java} | 16 ++--
...java => SimpleOneOutputTopicProducersTest.java} | 16 ++--
.../pulsar/functions/runtime/JavaInstanceMain.java | 34 +++----
.../pulsar/functions/runtime/ProcessRuntime.java | 26 ++---
.../functions/runtime/ProcessRuntimeTest.java | 14 +--
pulsar-functions/submit-python-function.sh | 4 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 106 ++++++++++-----------
19 files changed, 243 insertions(+), 243 deletions(-)
diff --git a/pulsar-client-cpp/python/functions/context.py b/pulsar-client-cpp/python/functions/context.py
index 8ffa2dc..4e1a969 100644
--- a/pulsar-client-cpp/python/functions/context.py
+++ b/pulsar-client-cpp/python/functions/context.py
@@ -100,8 +100,8 @@ class Context(object):
pass
@abstractmethod
- def get_sink_topic(self):
- '''Returns the sink topic of function'''
+ def get_output_topic(self):
+ '''Returns the output topic of function'''
pass
@abstractmethod
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index ca20cfe..e15d508 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -170,13 +170,13 @@ public class CmdFunctionsTest {
@Test
public void testCreateFunction() throws Exception {
String fnName = TEST_NAME + "-function";
- String sourceTopicName = TEST_NAME + "-source-topic";
- String sinkTopicName = TEST_NAME + "-sink-topic";
+ String inputTopicName = TEST_NAME + "-input-topic";
+ String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
- "--inputs", sourceTopicName,
- "--output", sinkTopicName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -185,8 +185,8 @@ public class CmdFunctionsTest {
CreateFunction creater = cmd.getCreater();
assertEquals(fnName, creater.getFunctionName());
- assertEquals(sourceTopicName, creater.getInputs());
- assertEquals(sinkTopicName, creater.getOutput());
+ assertEquals(inputTopicName, creater.getInputs());
+ assertEquals(outputTopicName, creater.getOutput());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
@@ -195,13 +195,13 @@ public class CmdFunctionsTest {
@Test
public void testCreateWithoutTenant() throws Exception {
String fnName = TEST_NAME + "-function";
- String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
- String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+ String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+ String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
- "--inputs", sourceTopicName,
- "--output", sinkTopicName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
"--jar", "SomeJar.jar",
"--namespace", "ns1",
"--className", DummyFunction.class.getName(),
@@ -215,13 +215,13 @@ public class CmdFunctionsTest {
@Test
public void testCreateWithoutNamespace() throws Exception {
String fnName = TEST_NAME + "-function";
- String sourceTopicName = "persistent://tenant/standalone/namespace/source-topic";
- String sinkTopicName = "persistent://tenant/standalone/namespace/sink-topic";
+ String inputTopicName = "persistent://tenant/standalone/namespace/input-topic";
+ String outputTopicName = "persistent://tenant/standalone/namespace/output-topic";
cmd.run(new String[] {
"create",
"--name", fnName,
- "--inputs", sourceTopicName,
- "--output", sinkTopicName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
"--jar", "SomeJar.jar",
"--className", DummyFunction.class.getName(),
});
@@ -234,12 +234,12 @@ public class CmdFunctionsTest {
@Test
public void testCreateWithoutFunctionName() throws Exception {
- String sourceTopicName = TEST_NAME + "-source-topic";
- String sinkTopicName = TEST_NAME + "-sink-topic";
+ String inputTopicName = TEST_NAME + "-input-topic";
+ String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] {
"create",
- "--inputs", sourceTopicName,
- "--output", sinkTopicName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -252,11 +252,11 @@ public class CmdFunctionsTest {
}
@Test
- public void testCreateWithoutSinkTopic() throws Exception {
- String sourceTopicName = TEST_NAME + "-source-topic";
+ public void testCreateWithoutOutputTopic() throws Exception {
+ String inputTopicName = TEST_NAME + "-input-topic";
cmd.run(new String[] {
"create",
- "--inputs", sourceTopicName,
+ "--inputs", inputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -264,7 +264,7 @@ public class CmdFunctionsTest {
});
CreateFunction creater = cmd.getCreater();
- assertEquals(sourceTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
+ assertEquals(inputTopicName + "-" + "CmdFunctionsTest$DummyFunction" + "-output", creater.getFunctionConfig().getOutput());
verify(functions, times(1)).createFunction(any(FunctionConfig.class), anyString());
}
@@ -313,16 +313,16 @@ public class CmdFunctionsTest {
@Test
public void testUpdateFunction() throws Exception {
String fnName = TEST_NAME + "-function";
- String sourceTopicName = TEST_NAME + "-source-topic";
- String sinkTopicName = TEST_NAME + "-sink-topic";
+ String inputTopicName = TEST_NAME + "-input-topic";
+ String outputTopicName = TEST_NAME + "-output-topic";
cmd.run(new String[] {
"update",
"--name", fnName,
- "--inputs", sourceTopicName,
- "--output", sinkTopicName,
+ "--inputs", inputTopicName,
+ "--output", outputTopicName,
"--jar", "SomeJar.jar",
"--tenant", "sample",
"--namespace", "ns1",
@@ -331,8 +331,8 @@ public class CmdFunctionsTest {
UpdateFunction updater = cmd.getUpdater();
assertEquals(fnName, updater.getFunctionName());
- assertEquals(sourceTopicName, updater.getInputs());
- assertEquals(sinkTopicName, updater.getOutput());
+ assertEquals(inputTopicName, updater.getInputs());
+ assertEquals(outputTopicName, updater.getOutput());
verify(functions, times(1)).updateFunction(any(FunctionConfig.class), anyString());
}
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index fb64141..8714962 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -44,16 +44,16 @@ public interface Context {
String getTopicName();
/**
- * Get a list of all source topics
- * @return a list of all source topics
+ * Get a list of all input topics
+ * @return a list of all input topics
*/
- Collection<String> getSourceTopics();
+ Collection<String> getInputTopics();
/**
- * Get sink topic of function
- * @return sink topic name
+ * Get the output topic of the function
+ * @return output topic name
*/
- String getSinkTopic();
+ String getOutputTopic();
/**
* Get output Serde class
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index a3a8b49..378106e 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -86,13 +86,13 @@ class ContextImpl implements Context {
private ProducerConfiguration producerConfiguration;
private PulsarClient pulsarClient;
private ClassLoader classLoader;
- private Map<String, Consumer> sourceConsumers;
+ private Map<String, Consumer> inputConsumers;
@Getter
@Setter
private StateContextImpl stateContext;
public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client,
- ClassLoader classLoader, Map<String, Consumer> sourceConsumers) {
+ ClassLoader classLoader, Map<String, Consumer> inputConsumers) {
this.config = config;
this.logger = logger;
this.pulsarClient = client;
@@ -100,7 +100,7 @@ class ContextImpl implements Context {
this.accumulatedMetrics = new ConcurrentHashMap<>();
this.publishProducers = new HashMap<>();
this.publishSerializers = new HashMap<>();
- this.sourceConsumers = sourceConsumers;
+ this.inputConsumers = inputConsumers;
producerConfiguration = new ProducerConfiguration();
producerConfiguration.setBlockIfQueueFull(true);
producerConfiguration.setBatchingEnabled(true);
@@ -124,12 +124,12 @@ class ContextImpl implements Context {
}
@Override
- public Collection<String> getSourceTopics() {
- return sourceConsumers.keySet();
+ public Collection<String> getInputTopics() {
+ return inputConsumers.keySet();
}
@Override
- public String getSinkTopic() {
+ public String getOutputTopic() {
return config.getFunctionConfig().getOutput();
}
@@ -236,7 +236,7 @@ class ContextImpl implements Context {
@Override
public CompletableFuture<Void> ack(byte[] messageId, String topic) {
- if (!sourceConsumers.containsKey(topic)) {
+ if (!inputConsumers.containsKey(topic)) {
throw new RuntimeException("No such input topic " + topic);
}
@@ -246,7 +246,7 @@ class ContextImpl implements Context {
} catch (IOException e) {
throw new RuntimeException("Invalid message id to ack", e);
}
- return sourceConsumers.get(topic).acknowledgeAsync(actualMessageId);
+ return inputConsumers.get(topic).acknowledgeAsync(actualMessageId);
}
@Override
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
index 14f3171..0f1b3da 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstance.java
@@ -48,11 +48,11 @@ public class JavaInstance implements AutoCloseable {
public JavaInstance(InstanceConfig config, Object userClassObject,
ClassLoader clsLoader,
PulsarClient pulsarClient,
- Map<String, Consumer> sourceConsumers) {
+ Map<String, Consumer> inputConsumers) {
// TODO: cache logger instances by functions?
Logger instanceLog = LoggerFactory.getLogger("function-" + config.getFunctionConfig().getName());
- this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, sourceConsumers);
+ this.context = new ContextImpl(config, instanceLog, pulsarClient, clsLoader, inputConsumers);
// create the functions
if (userClassObject instanceof Function) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index fffe8f5..3a634d4 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -60,9 +60,9 @@ import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuara
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.api.SerDe;
-import org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Reflections;
@@ -82,13 +82,13 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
private final LinkedBlockingDeque<InputMessage> queue;
private final String jarFile;
- // source topic consumer & sink topic producer
+ // input topic consumer & output topic producer
private final PulsarClientImpl client;
@Getter(AccessLevel.PACKAGE)
- private Producers sinkProducer;
+ private Producers outputProducer;
@Getter(AccessLevel.PACKAGE)
- private final Map<String, Consumer> sourceConsumers;
- private LinkedList<String> sourceTopicsToResubscribe = null;
+ private final Map<String, Consumer> inputConsumers;
+ private LinkedList<String> inputTopicsToResubscribe = null;
// provide tables for storing states
private final String stateStorageServiceUrl;
@@ -137,7 +137,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.stats = new FunctionStats();
- this.sourceConsumers = Maps.newConcurrentMap();
+ this.inputConsumers = Maps.newConcurrentMap();
}
private SubscriptionType getSubscriptionType() {
@@ -188,12 +188,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
// start the state table
setupStateTable();
- // start the sink producer
- startSinkProducer();
- // start the source consumer
- startSourceConsumers();
+ // start the output producer
+ startOutputProducer();
+ // start the input consumer
+ startInputConsumer();
- return new JavaInstance(instanceConfig, object, clsLoader, client, sourceConsumers);
+ return new JavaInstance(instanceConfig, object, clsLoader, client, inputConsumers);
}
/**
@@ -222,23 +222,23 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
if (ProcessingGuarantees.EFFECTIVELY_ONCE == processingGuarantees) {
// if the messages are received from old consumers, we discard it since new consumer was
// re-created for the correctness of effectively-once
- if (msg.getConsumer() != sourceConsumers.get(msg.getTopicName())) {
+ if (msg.getConsumer() != inputConsumers.get(msg.getTopicName())) {
continue;
}
}
- if (null != sinkProducer) {
+ if (null != outputProducer) {
// before processing the message, we have a producer connection setup for producing results.
Producer producer = null;
while (null == producer) {
try {
- producer = sinkProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
+ producer = outputProducer.getProducer(msg.getTopicName(), msg.getTopicPartition());
} catch (PulsarClientException e) {
// `ProducerBusy` is thrown when an producer with same name is still connected.
- // This can happen when a active consumer is changed for a given source topic partition
+ // This can happen when a active consumer is changed for a given input topic partition
// so we need to wait until the old active consumer release the produce connection.
if (!(e instanceof ProducerBusyException)) {
- log.error("Failed to get a producer for producing results computed from source topic {}",
+ log.error("Failed to get a producer for producing results computed from input topic {}",
msg.getTopicName());
}
TimeUnit.MILLISECONDS.sleep(500);
@@ -318,9 +318,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
public void becameActive(Consumer consumer, int partitionId) {
// if the instance becomes active for a given topic partition,
// open a producer for the results computed from this topic partition.
- if (null != sinkProducer) {
+ if (null != outputProducer) {
try {
- this.sinkProducer.getProducer(consumer.getTopic(), partitionId);
+ this.outputProducer.getProducer(consumer.getTopic(), partitionId);
} catch (PulsarClientException e) {
// this can be ignored, because producer can be lazily created when accessing it.
log.warn("Fail to create a producer for results computed from messages of topic: {}, partition: {}",
@@ -331,10 +331,10 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
@Override
public void becameInactive(Consumer consumer, int partitionId) {
- if (null != sinkProducer) {
+ if (null != outputProducer) {
// if I lost the ownership of a partition, close its corresponding topic partition.
// this is to allow the new active consumer be able to produce to the result topic.
- this.sinkProducer.closeProducer(consumer.getTopic(), partitionId);
+ this.outputProducer.closeProducer(consumer.getTopic(), partitionId);
}
}
@@ -380,33 +380,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
this.stateTable = result(storageClient.openTable(tableName));
}
- private void startSinkProducer() throws Exception {
+ private void startOutputProducer() throws Exception {
if (instanceConfig.getFunctionConfig().getOutput() != null
&& !instanceConfig.getFunctionConfig().getOutput().isEmpty()
&& this.outputSerDe != null) {
- log.info("Starting Producer for Sink Topic " + instanceConfig.getFunctionConfig().getOutput());
+ log.info("Starting producer for output topic " + instanceConfig.getFunctionConfig().getOutput());
if (processingGuarantees == ProcessingGuarantees.EFFECTIVELY_ONCE) {
- this.sinkProducer = new MultiConsumersOneSinkTopicProducers(
+ this.outputProducer = new MultiConsumersOneOuputTopicProducers(
client, instanceConfig.getFunctionConfig().getOutput());
} else {
- this.sinkProducer = new SimpleOneSinkTopicProducers(
+ this.outputProducer = new SimpleOneOuputTopicProducers(
client, instanceConfig.getFunctionConfig().getOutput());
}
- this.sinkProducer.initialize();
+ this.outputProducer.initialize();
}
}
- private void startSourceConsumers() throws Exception {
+ private void startInputConsumer() throws Exception {
log.info("Consumer map {}", instanceConfig.getFunctionConfig());
for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
ConsumerConfiguration conf = createConsumerConfiguration(entry.getKey());
- this.sourceConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
+ this.inputConsumers.put(entry.getKey(), client.subscribe(entry.getKey(),
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
}
for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
ConsumerConfiguration conf = createConsumerConfiguration(topicName);
- this.sourceConsumers.put(topicName, client.subscribe(topicName,
+ this.inputConsumers.put(topicName, client.subscribe(topicName,
FunctionConfigUtils.getFullyQualifiedName(instanceConfig.getFunctionConfig()), conf));
}
}
@@ -457,7 +457,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
handleProcessException(msg.getTopicName());
} else {
stats.incrementSuccessfullyProcessed(endTime - startTime);
- if (result.getResult() != null && sinkProducer != null) {
+ if (result.getResult() != null && outputProducer != null) {
byte[] output;
try {
output = outputSerDe.serialize(result.getResult());
@@ -504,9 +504,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
Producer producer;
try {
- producer = sinkProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
+ producer = outputProducer.getProducer(srcMsg.getTopicName(), srcMsg.getTopicPartition());
} catch (PulsarClientException e) {
- log.error("Failed to get a producer for producing results computed from source topic {}",
+ log.error("Failed to get a producer for producing results computed from input topic {}",
srcMsg.getTopicName());
// if we fail to get a producer, put this message back to queue and reprocess it.
@@ -522,7 +522,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
})
.exceptionally(cause -> {
- log.error("Failed to send the process result {} of message {} to sink topic {}",
+ log.error("Failed to send the process result {} of message {} to output topic {}",
result, srcMsg, instanceConfig.getFunctionConfig().getOutput(), cause);
handleProcessException(srcMsg.getTopicName());
return null;
@@ -549,17 +549,17 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
private synchronized void addTopicToResubscribeList(String topicName) {
- if (null == sourceTopicsToResubscribe) {
- sourceTopicsToResubscribe = new LinkedList<>();
+ if (null == inputTopicsToResubscribe) {
+ inputTopicsToResubscribe = new LinkedList<>();
}
- sourceTopicsToResubscribe.add(topicName);
+ inputTopicsToResubscribe.add(topicName);
}
private void resubscribeTopicsIfNeeded() {
List<String> topicsToResubscribe;
synchronized (this) {
- topicsToResubscribe = sourceTopicsToResubscribe;
- sourceTopicsToResubscribe = null;
+ topicsToResubscribe = inputTopicsToResubscribe;
+ inputTopicsToResubscribe = null;
}
if (null != topicsToResubscribe) {
for (String topic : topicsToResubscribe) {
@@ -571,7 +571,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
private void resubscribe(String srcTopic) {
// if we can not produce a message to output topic, then close the consumer of the src topic
// and retry to instantiate a consumer again.
- Consumer consumer = sourceConsumers.remove(srcTopic);
+ Consumer consumer = inputConsumers.remove(srcTopic);
if (consumer != null) {
// TODO (sijie): currently we have to close the entire consumer for a given topic. However
// ideally we should do this in a finer granularity - we can close consumer
@@ -579,14 +579,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
try {
consumer.close();
} catch (PulsarClientException e) {
- log.error("Failed to close consumer for source topic {} when handling produce exceptions",
+ log.error("Failed to close consumer for input topic {} when handling produce exceptions",
srcTopic, e);
}
}
// subscribe to the src topic again
ConsumerConfiguration conf = createConsumerConfiguration(srcTopic);
try {
- sourceConsumers.put(
+ inputConsumers.put(
srcTopic,
client.subscribe(
srcTopic,
@@ -594,7 +594,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
conf
));
} catch (PulsarClientException e) {
- log.error("Failed to resubscribe to source topic {}. Added it to retry list and retry it later",
+ log.error("Failed to resubscribe to input topic {}. Added it to retry list and retry it later",
srcTopic, e);
addTopicToResubscribeList(srcTopic);
}
@@ -607,19 +607,19 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable, ConsumerEv
}
running = false;
// stop the consumer first, so no more messages are coming in
- sourceConsumers.forEach((k, v) -> {
+ inputConsumers.forEach((k, v) -> {
try {
v.close();
} catch (PulsarClientException e) {
- log.warn("Failed to close consumer to source topic {}", k, e);
+ log.warn("Failed to close consumer to input topic {}", k, e);
}
});
- sourceConsumers.clear();
+ inputConsumers.clear();
// kill the result producer
- if (null != sinkProducer) {
- sinkProducer.close();
- sinkProducer = null;
+ if (null != outputProducer) {
+ outputProducer.close();
+ outputProducer = null;
}
// kill the state table
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
similarity index 91%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
index f6dd83d..723529d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/AbstractOneOuputTopicProducers.java
@@ -28,17 +28,17 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.instance.FunctionResultRouter;
-abstract class AbstractOneSinkTopicProducers implements Producers {
+abstract class AbstractOneOuputTopicProducers implements Producers {
protected final PulsarClient client;
- protected final String sinkTopic;
+ protected final String outputTopic;
protected final ProducerConfiguration conf;
- AbstractOneSinkTopicProducers(PulsarClient client,
- String sinkTopic)
+ AbstractOneOuputTopicProducers(PulsarClient client,
+ String outputTopic)
throws PulsarClientException {
this.client = client;
- this.sinkTopic = sinkTopic;
+ this.outputTopic = outputTopic;
this.conf = newProducerConfiguration();
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
similarity index 88%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
index 63eb51e..cf21a04 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOuputTopicProducers.java
@@ -34,15 +34,15 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@Slf4j
-public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class MultiConsumersOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
@Getter(AccessLevel.PACKAGE)
private final Map<String, IntObjectMap<Producer>> producers;
- public MultiConsumersOneSinkTopicProducers(PulsarClient client,
- String sinkTopic)
+ public MultiConsumersOneOuputTopicProducers(PulsarClient client,
+ String outputTopic)
throws PulsarClientException {
- super(client, sinkTopic);
+ super(client, outputTopic);
this.producers = new ConcurrentHashMap<>();
}
@@ -65,7 +65,7 @@ public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicPro
Producer producer = producerMap.get(srcTopicPartition);
if (null == producer) {
- producer = createProducer(sinkTopic, makeProducerName(srcTopicName, srcTopicPartition));
+ producer = createProducer(outputTopic, makeProducerName(srcTopicName, srcTopicPartition));
producerMap.put(srcTopicPartition, producer);
}
return producer;
@@ -97,7 +97,7 @@ public class MultiConsumersOneSinkTopicProducers extends AbstractOneSinkTopicPro
try {
FutureUtils.result(FutureUtils.collect(closeFutures));
} catch (Exception e) {
- log.warn("Fail to close all the producers for sink topic {}", sinkTopic, e);
+ log.warn("Fail to close all the producers for output topic {}", outputTopic, e);
}
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
similarity index 82%
rename from pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
rename to pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
index dc8b121..2fee37a 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducers.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/producers/SimpleOneOuputTopicProducers.java
@@ -27,18 +27,18 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@Slf4j
-public class SimpleOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
+public class SimpleOneOuputTopicProducers extends AbstractOneOuputTopicProducers {
@Getter
private Producer producer;
- public SimpleOneSinkTopicProducers(PulsarClient client, String sinkTopic) throws PulsarClientException {
- super(client, sinkTopic);
+ public SimpleOneOuputTopicProducers(PulsarClient client, String outputTopic) throws PulsarClientException {
+ super(client, outputTopic);
}
@Override
public void initialize() throws PulsarClientException {
- producer = createProducer(sinkTopic);
+ producer = createProducer(outputTopic);
}
@Override
@@ -57,7 +57,7 @@ public class SimpleOneSinkTopicProducers extends AbstractOneSinkTopicProducers {
try {
producer.close();
} catch (PulsarClientException e) {
- log.warn("Fail to close producer of topic {}", sinkTopic, e);
+ log.warn("Fail to close producer of topic {}", outputTopic, e);
}
}
}
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 6a3a5da..9c77fd1 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -98,7 +98,7 @@ class ContextImpl(pulsar.Context):
self.accumulated_metrics[metric_name] = AccumulatedMetricDatum()
self.accumulated_metrics[metric_name].update(metric_value)
- def get_sink_topic(self):
+ def get_output_topic(self):
return self.instance_config.function_config.output
def get_output_serde_class_name(self):
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 0bf02b0..87ba549 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -57,10 +57,10 @@ def main():
parser.add_argument('--name', required=True, help='Function Name')
parser.add_argument('--tenant', required=True, help='Tenant Name')
parser.add_argument('--namespace', required=True, help='Namespace name')
- parser.add_argument('--custom_serde_source_topics', required=False, help='Source Topics Requiring Custom Deserialization')
+ parser.add_argument('--custom_serde_input_topics', required=False, help='Input Topics Requiring Custom Deserialization')
parser.add_argument('--custom_serde_classnames', required=False, help='Input Serde Classnames')
- parser.add_argument('--source_topics', required=False, help='Input topics with default serde')
- parser.add_argument('--sink_topic', required=False, help='Sink Topic')
+ parser.add_argument('--input_topics', required=False, help='Input topics with default serde')
+ parser.add_argument('--output_topic', required=False, help='Output Topic')
parser.add_argument('--output_serde_classname', required=False, help='Output Serde Classnames')
parser.add_argument('--instance_id', required=True, help='Instance Id')
parser.add_argument('--function_id', required=True, help='Function Id')
@@ -87,22 +87,22 @@ def main():
function_config.namespace = args.namespace
function_config.name = args.name
function_config.className = args.function_classname
- if args.custom_serde_source_topics is None and args.source_topics is None:
- Log.critical("Atleast one source topic must be present")
+ if args.custom_serde_input_topics is None and args.input_topics is None:
+ Log.critical("Atleast one input topic must be present")
sys.exit(1)
- if args.custom_serde_source_topics is not None and args.custom_serde_classnames is not None:
- source_topics = args.custom_serde_source_topics.split(",")
- source_serde = args.custom_serde_classnames.split(",")
- if len(source_topics) != len(source_serde):
- Log.critical("CustomSerde SourceTopics and Serde classnames should match")
+ if args.custom_serde_input_topics is not None and args.custom_serde_classnames is not None:
+ input_topics = args.custom_serde_input_topics.split(",")
+ input_serde = args.custom_serde_classnames.split(",")
+ if len(input_topics) != len(input_serde):
+ Log.critical("CustomSerde InputTopcis and Serde classnames should match")
sys.exit(1)
- for i in xrange(len(source_topics)):
- function_config.customSerdeInputs[source_topics[i]] = source_serde[i]
- if args.source_topics is not None:
- for topic in args.source_topics.split(","):
+ for i in xrange(len(input_topics)):
+ function_config.customSerdeInputs[input_topics[i]] = input_serde[i]
+ if args.input_topics is not None:
+ for topic in args.input_topics.split(","):
function_config.inputs.append(topic)
- if args.sink_topic != None and len(args.sink_topic) != 0:
- function_config.output = args.sink_topic
+ if args.output_topic != None and len(args.output_topic) != 0:
+ function_config.output = args.output_topic
if args.output_serde_classname != None and len(args.output_serde_classname) != 0:
function_config.outputSerdeClassName = args.output_serde_classname
function_config.processingGuarantees = Function_pb2.FunctionConfig.ProcessingGuarantees.Value(args.processing_guarantees)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
index e8e761f..0b2c0b6 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableProcessTest.java
@@ -81,7 +81,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionConfig.ProcessingGuara
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.producers.Producers;
-import org.apache.pulsar.functions.instance.producers.SimpleOneSinkTopicProducers;
+import org.apache.pulsar.functions.instance.producers.SimpleOneOuputTopicProducers;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.powermock.api.mockito.PowerMockito;
@@ -231,7 +231,7 @@ public class JavaInstanceRunnableProcessTest {
.setClassName(TestFunction.class.getName())
.addInputs("test-src-topic")
.setName("test-function")
- .setOutput("test-sink-topic")
+ .setOutput("test-output-topic")
.setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
.setTenant("test-tenant")
.setNamespace("test-namespace")
@@ -430,15 +430,15 @@ public class JavaInstanceRunnableProcessTest {
}
// 3. verify producers and consumers are setup
- Producers producers = runnable.getSinkProducer();
- assertTrue(producers instanceof SimpleOneSinkTopicProducers);
+ Producers producers = runnable.getOutputProducer();
+ assertTrue(producers instanceof SimpleOneOuputTopicProducers);
assertSame(mockProducers.get(Pair.of(
fnConfig.getOutput(),
null
- )).getProducer(), ((SimpleOneSinkTopicProducers) producers).getProducer());
+ )).getProducer(), ((SimpleOneOuputTopicProducers) producers).getProducer());
- assertEquals(mockConsumers.size(), runnable.getSourceConsumers().size());
- for (Map.Entry<String, Consumer> consumerEntry : runnable.getSourceConsumers().entrySet()) {
+ assertEquals(mockConsumers.size(), runnable.getInputConsumers().size());
+ for (Map.Entry<String, Consumer> consumerEntry : runnable.getInputConsumers().entrySet()) {
String topic = consumerEntry.getKey();
Consumer mockConsumer = mockConsumers.get(Pair.of(
@@ -457,7 +457,7 @@ public class JavaInstanceRunnableProcessTest {
for (ConsumerInstance consumer : mockConsumers.values()) {
verify(consumer.getConsumer(), times(1)).close();
}
- assertTrue(runnable.getSourceConsumers().isEmpty());
+ assertTrue(runnable.getInputConsumers().isEmpty());
for (ProducerInstance producer : mockProducers.values()) {
verify(producer.getProducer(), times(1)).close();
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
similarity index 90%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
index 06de835..577015a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/MultiConsumersOneOutputTopicProducersTest.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.instance.producers;
-import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneSinkTopicProducers.makeProducerName;
+import static org.apache.pulsar.functions.instance.producers.MultiConsumersOneOuputTopicProducers.makeProducerName;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -41,15 +41,15 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
- * Unit test of {@link MultiConsumersOneSinkTopicProducers}.
+ * Unit test of {@link MultiConsumersOneOuputTopicProducers}.
*/
-public class MultiConsumersOneSinkTopicProducersTest {
+public class MultiConsumersOneOutputTopicProducersTest {
- private static final String TEST_SINK_TOPIC = "test-sink-topic";
+ private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
private PulsarClient mockClient;
private final Map<String, Producer> mockProducers = new HashMap<>();
- private MultiConsumersOneSinkTopicProducers producers;
+ private MultiConsumersOneOuputTopicProducers producers;
@BeforeMethod
public void setup() throws Exception {
@@ -70,7 +70,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
}
});
- producers = new MultiConsumersOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+ producers = new MultiConsumersOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
producers.initialize();
}
@@ -97,7 +97,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
.createProducer(
- eq(TEST_SINK_TOPIC),
+ eq(TEST_OUTPUT_TOPIC),
any(ProducerConfiguration.class)
);
assertTrue(producers.getProducers().containsKey(srcTopic));
@@ -108,7 +108,7 @@ public class MultiConsumersOneSinkTopicProducersTest {
assertSame(mockProducers.get(producerName), producer);
verify(mockClient, times(1))
.createProducer(
- eq(TEST_SINK_TOPIC),
+ eq(TEST_OUTPUT_TOPIC),
any(ProducerConfiguration.class)
);
assertTrue(producers.getProducers().containsKey(srcTopic));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
similarity index 82%
rename from pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
rename to pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
index 84bc693..67337a1 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneSinkTopicProducersTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/producers/SimpleOneOutputTopicProducersTest.java
@@ -34,15 +34,15 @@ import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
/**
- * Unit test of {@link SimpleOneSinkTopicProducers}.
+ * Unit test of {@link SimpleOneOuputTopicProducers}.
*/
-public class SimpleOneSinkTopicProducersTest {
+public class SimpleOneOutputTopicProducersTest {
- private static final String TEST_SINK_TOPIC = "test-sink-topic";
+ private static final String TEST_OUTPUT_TOPIC = "test-output-topic";
private PulsarClient mockClient;
private Producer mockProducer;
- private SimpleOneSinkTopicProducers producers;
+ private SimpleOneOuputTopicProducers producers;
@BeforeMethod
public void setup() throws Exception {
@@ -52,7 +52,7 @@ public class SimpleOneSinkTopicProducersTest {
when(mockClient.createProducer(anyString(), any(ProducerConfiguration.class)))
.thenReturn(mockProducer);
- this.producers = new SimpleOneSinkTopicProducers(mockClient, TEST_SINK_TOPIC);
+ this.producers = new SimpleOneOuputTopicProducers(mockClient, TEST_OUTPUT_TOPIC);
}
@Test
@@ -60,7 +60,7 @@ public class SimpleOneSinkTopicProducersTest {
this.producers.initialize();
verify(mockClient, times(1))
- .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+ .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
this.producers.close();
@@ -77,12 +77,12 @@ public class SimpleOneSinkTopicProducersTest {
this.producers.initialize();
verify(mockClient, times(1))
- .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+ .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
assertSame(mockProducer, this.producers.getProducer("test-src-topic", 0));
verify(mockClient, times(1))
- .createProducer(eq(TEST_SINK_TOPIC), any(ProducerConfiguration.class));
+ .createProducer(eq(TEST_OUTPUT_TOPIC), any(ProducerConfiguration.class));
producers.closeProducer("test-src-topic", 0);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index badee94..08bfbe4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -56,15 +56,15 @@ public class JavaInstanceMain {
@Parameter(names = "--namespace", description = "Namespace Name\n", required = true)
protected String namespace;
- @Parameter(names = "--sink_topic", description = "Output Topic Name\n")
- protected String sinkTopicName;
+ @Parameter(names = "--output_topic", description = "Output Topic Name\n")
+ protected String outputTopicName;
- @Parameter(names = "--custom_serde_source_topics", description = "Input Topics that need custom deserialization\n", required = false)
- protected String customSerdeSourceTopics;
+ @Parameter(names = "--custom_serde_input_topics", description = "Input Topics that need custom deserialization\n", required = false)
+ protected String customSerdeInputTopics;
@Parameter(names = "--custom_serde_classnames", description = "Input SerDe\n", required = false)
protected String customSerdeClassnames;
- @Parameter(names = "--source_topics", description = "Input Topics\n", required = false)
- protected String defaultSerdeSourceTopics;
+ @Parameter(names = "--input_topics", description = "Input Topics\n", required = false)
+ protected String defaultSerdeInputTopics;
@Parameter(names = "--output_serde_classname", description = "Output SerDe\n")
protected String outputSerdeClassName;
@@ -118,27 +118,27 @@ public class JavaInstanceMain {
functionConfigBuilder.setNamespace(namespace);
functionConfigBuilder.setName(functionName);
functionConfigBuilder.setClassName(className);
- if (defaultSerdeSourceTopics != null) {
- String[] sourceTopics = defaultSerdeSourceTopics.split(",");
- for (String sourceTopic : sourceTopics) {
- functionConfigBuilder.addInputs(sourceTopic);
+ if (defaultSerdeInputTopics != null) {
+ String[] inputTopics = defaultSerdeInputTopics.split(",");
+ for (String inputTopic : inputTopics) {
+ functionConfigBuilder.addInputs(inputTopic);
}
}
- if (customSerdeSourceTopics != null && customSerdeClassnames != null) {
- String[] sourceTopics = customSerdeSourceTopics.split(",");
+ if (customSerdeInputTopics != null && customSerdeClassnames != null) {
+ String[] inputTopics = customSerdeInputTopics.split(",");
String[] inputSerdeClassNames = customSerdeClassnames.split(",");
- if (sourceTopics.length != inputSerdeClassNames.length) {
+ if (inputTopics.length != inputSerdeClassNames.length) {
throw new RuntimeException("Error specifying inputs");
}
- for (int i = 0; i < sourceTopics.length; ++i) {
- functionConfigBuilder.putCustomSerdeInputs(sourceTopics[i], inputSerdeClassNames[i]);
+ for (int i = 0; i < inputTopics.length; ++i) {
+ functionConfigBuilder.putCustomSerdeInputs(inputTopics[i], inputSerdeClassNames[i]);
}
}
if (outputSerdeClassName != null) {
functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
}
- if (sinkTopicName != null) {
- functionConfigBuilder.setOutput(sinkTopicName);
+ if (outputTopicName != null) {
+ functionConfigBuilder.setOutput(outputTopicName);
}
if (logTopic != null) {
functionConfigBuilder.setLogTopic(logTopic);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index c1d7b7f..1bdedd7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -112,13 +112,13 @@ class ProcessRuntime implements Runtime {
args.add(instanceConfig.getFunctionConfig().getLogTopic());
}
if (instanceConfig.getFunctionConfig().getCustomSerdeInputsCount() > 0) {
- String sourceTopicString = "";
+ String inputTopicString = "";
String inputSerdeClassNameString = "";
for (Map.Entry<String, String> entry : instanceConfig.getFunctionConfig().getCustomSerdeInputsMap().entrySet()) {
- if (sourceTopicString.isEmpty()) {
- sourceTopicString = entry.getKey();
+ if (inputTopicString.isEmpty()) {
+ inputTopicString = entry.getKey();
} else {
- sourceTopicString = sourceTopicString + "," + entry.getKey();
+ inputTopicString = inputTopicString + "," + entry.getKey();
}
if (inputSerdeClassNameString.isEmpty()) {
inputSerdeClassNameString = entry.getValue();
@@ -126,22 +126,22 @@ class ProcessRuntime implements Runtime {
inputSerdeClassNameString = inputSerdeClassNameString + "," + entry.getValue();
}
}
- args.add("--custom_serde_source_topics");
- args.add(sourceTopicString);
+ args.add("--custom_serde_input_topics");
+ args.add(inputTopicString);
args.add("--custom_serde_classnames");
args.add(inputSerdeClassNameString);
}
if (instanceConfig.getFunctionConfig().getInputsCount() > 0) {
- String sourceTopicString = "";
+ String inputTopicString = "";
for (String topicName : instanceConfig.getFunctionConfig().getInputsList()) {
- if (sourceTopicString.isEmpty()) {
- sourceTopicString = topicName;
+ if (inputTopicString.isEmpty()) {
+ inputTopicString = topicName;
} else {
- sourceTopicString = sourceTopicString + "," + topicName;
+ inputTopicString = inputTopicString + "," + topicName;
}
}
- args.add("--source_topics");
- args.add(sourceTopicString);
+ args.add("--input_topics");
+ args.add(inputTopicString);
}
args.add("--auto_ack");
if (instanceConfig.getFunctionConfig().getAutoAck()) {
@@ -151,7 +151,7 @@ class ProcessRuntime implements Runtime {
}
if (instanceConfig.getFunctionConfig().getOutput() != null
&& !instanceConfig.getFunctionConfig().getOutput().isEmpty()) {
- args.add("--sink_topic");
+ args.add("--output_topic");
args.add(instanceConfig.getFunctionConfig().getOutput());
}
if (instanceConfig.getFunctionConfig().getOutputSerdeClassName() != null
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 31e6422..e316e6e 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -75,9 +75,9 @@ public class ProcessRuntimeTest {
functionConfigBuilder.setNamespace(TEST_NAMESPACE);
functionConfigBuilder.setName(TEST_NAME);
functionConfigBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
- functionConfigBuilder.addInputs(TEST_NAME + "-source1");
- functionConfigBuilder.addInputs(TEST_NAME + "-source2");
- functionConfigBuilder.setOutput(TEST_NAME + "-sink");
+ functionConfigBuilder.addInputs(TEST_NAME + "-input1");
+ functionConfigBuilder.addInputs(TEST_NAME + "-input2");
+ functionConfigBuilder.setOutput(TEST_NAME + "-output");
functionConfigBuilder.setOutputSerdeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer");
functionConfigBuilder.setLogTopic(TEST_NAME + "-log");
return functionConfigBuilder.build();
@@ -113,9 +113,9 @@ public class ProcessRuntimeTest {
+ " --name " + config.getFunctionConfig().getName()
+ " --function_classname " + config.getFunctionConfig().getClassName()
+ " --log_topic " + config.getFunctionConfig().getLogTopic()
- + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+ + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
+ " --auto_ack false"
- + " --sink_topic " + config.getFunctionConfig().getOutput()
+ + " --output_topic " + config.getFunctionConfig().getOutput()
+ " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
@@ -140,9 +140,9 @@ public class ProcessRuntimeTest {
+ " --name " + config.getFunctionConfig().getName()
+ " --function_classname " + config.getFunctionConfig().getClassName()
+ " --log_topic " + config.getFunctionConfig().getLogTopic()
- + " --source_topics " + TEST_NAME + "-source1," + TEST_NAME + "-source2"
+ + " --input_topics " + TEST_NAME + "-input1," + TEST_NAME + "-input2"
+ " --auto_ack false"
- + " --sink_topic " + config.getFunctionConfig().getOutput()
+ + " --output_topic " + config.getFunctionConfig().getOutput()
+ " --output_serde_classname " + config.getFunctionConfig().getOutputSerdeClassName()
+ " --processing_guarantees ATLEAST_ONCE"
+ " --pulsar_serviceurl " + pulsarServiceUrl
diff --git a/pulsar-functions/submit-python-function.sh b/pulsar-functions/submit-python-function.sh
index 7085047..ed625a5 100755
--- a/pulsar-functions/submit-python-function.sh
+++ b/pulsar-functions/submit-python-function.sh
@@ -21,8 +21,8 @@
bin/pulsar-functions functions create \
--function-config conf/example.yml \
- --sink-topic persistent://sample/standalone/ns1/test_result \
- --source-topics persistent://sample/standalone/ns1/test_src \
+ --output-topic persistent://sample/standalone/ns1/test_result \
+ --input-topics persistent://sample/standalone/ns1/test_src \
--output-serde-classname pulsarfunction.serde.IdentitySerDe \
--py python-examples/exclamation.py \
--function-classname exclamation.Exclamation
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 7dabeed..0f182d1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -86,8 +86,8 @@ public class FunctionApiV2ResourceTest {
private static final String tenant = "test-tenant";
private static final String namespace = "test-namespace";
private static final String function = "test-function";
- private static final String sinkTopic = "test-sink-topic";
- private static final String sourceTopic = "test-source-topic";
+ private static final String outputTopic = "test-output-topic";
+ private static final String inputTopic = "test-input-topic";
private static final String inputSerdeClassName = DefaultSerDe.class.getName();
private static final String outputSerdeClassName = DefaultSerDe.class.getName();
private static final String className = TestFunction.class.getName();
@@ -136,8 +136,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -153,8 +153,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -170,8 +170,8 @@ public class FunctionApiV2ResourceTest {
null,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -187,8 +187,8 @@ public class FunctionApiV2ResourceTest {
function,
null,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -204,8 +204,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
null,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -214,14 +214,14 @@ public class FunctionApiV2ResourceTest {
}
@Test
- public void testRegisterFunctionMissingSourceTopic() throws IOException {
+ public void testRegisterFunctionMissingInputTopic() throws IOException {
testRegisterFunctionMissingArguments(
tenant,
namespace,
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
+ outputTopic,
null,
inputSerdeClassName,
outputSerdeClassName,
@@ -238,8 +238,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
null,
outputSerdeClassName,
className,
@@ -255,8 +255,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
null,
@@ -272,8 +272,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -287,8 +287,8 @@ public class FunctionApiV2ResourceTest {
String function,
InputStream inputStream,
FormDataContentDisposition details,
- String sinkTopic,
- String sourceTopic,
+ String outputTopic,
+ String inputTopic,
String inputSerdeClassName,
String outputSerdeClassName,
String className,
@@ -305,11 +305,11 @@ public class FunctionApiV2ResourceTest {
if (function != null) {
functionConfigBuilder.setName(function);
}
- if (sinkTopic != null) {
- functionConfigBuilder.setOutput(sinkTopic);
+ if (outputTopic != null) {
+ functionConfigBuilder.setOutput(outputTopic);
}
- if (sourceTopic != null && inputSerdeClassName != null) {
- functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+ if (inputTopic != null && inputSerdeClassName != null) {
+ functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
}
if (outputSerdeClassName != null) {
functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -341,7 +341,7 @@ public class FunctionApiV2ResourceTest {
private Response registerDefaultFunction() throws IOException {
FunctionConfig functionConfig = FunctionConfig.newBuilder()
.setTenant(tenant).setNamespace(namespace).setName(function)
- .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+ .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
.setOutputSerdeClassName(outputSerdeClassName)
.setClassName(className)
.setParallelism(parallelism).build();
@@ -456,8 +456,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -473,8 +473,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -490,8 +490,8 @@ public class FunctionApiV2ResourceTest {
null,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -507,8 +507,8 @@ public class FunctionApiV2ResourceTest {
function,
null,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -524,8 +524,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
null,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -541,7 +541,7 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
+ outputTopic,
null,
inputSerdeClassName,
outputSerdeClassName,
@@ -558,8 +558,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
null,
outputSerdeClassName,
className,
@@ -575,8 +575,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
null,
@@ -591,8 +591,8 @@ public class FunctionApiV2ResourceTest {
function,
mockedInputStream,
mockedFormData,
- sinkTopic,
- sourceTopic,
+ outputTopic,
+ inputTopic,
inputSerdeClassName,
outputSerdeClassName,
className,
@@ -607,8 +607,8 @@ public class FunctionApiV2ResourceTest {
String function,
InputStream inputStream,
FormDataContentDisposition details,
- String sinkTopic,
- String sourceTopic,
+ String outputTopic,
+ String inputTopic,
String inputSerdeClassName,
String outputSerdeClassName,
String className,
@@ -625,11 +625,11 @@ public class FunctionApiV2ResourceTest {
if (function != null) {
functionConfigBuilder.setName(function);
}
- if (sinkTopic != null) {
- functionConfigBuilder.setOutput(sinkTopic);
+ if (outputTopic != null) {
+ functionConfigBuilder.setOutput(outputTopic);
}
- if (sourceTopic != null && inputSerdeClassName != null) {
- functionConfigBuilder.putCustomSerdeInputs(sourceTopic, inputSerdeClassName);
+ if (inputTopic != null && inputSerdeClassName != null) {
+ functionConfigBuilder.putCustomSerdeInputs(inputTopic, inputSerdeClassName);
}
if (outputSerdeClassName != null) {
functionConfigBuilder.setOutputSerdeClassName(outputSerdeClassName);
@@ -661,7 +661,7 @@ public class FunctionApiV2ResourceTest {
private Response updateDefaultFunction() throws IOException {
FunctionConfig functionConfig = FunctionConfig.newBuilder()
.setTenant(tenant).setNamespace(namespace).setName(function)
- .setOutput(sinkTopic).putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+ .setOutput(outputTopic).putCustomSerdeInputs(inputTopic, inputSerdeClassName)
.setOutputSerdeClassName(outputSerdeClassName)
.setClassName(className)
.setParallelism(parallelism).build();
@@ -935,12 +935,12 @@ public class FunctionApiV2ResourceTest {
FunctionConfig functionConfig = FunctionConfig.newBuilder()
.setClassName(className)
- .putCustomSerdeInputs(sourceTopic, inputSerdeClassName)
+ .putCustomSerdeInputs(inputTopic, inputSerdeClassName)
.setOutputSerdeClassName(outputSerdeClassName)
.setName(function)
.setNamespace(namespace)
.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE)
- .setOutput(sinkTopic)
+ .setOutput(outputTopic)
.setTenant(tenant)
.setParallelism(parallelism).build();
FunctionMetaData metaData = FunctionMetaData.newBuilder()
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.