You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/16 05:42:05 UTC
[incubator-pulsar] branch master updated: fixing behavior when
configs are empty (#1781)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 1478d07 fixing behavior when configs are empty (#1781)
1478d07 is described below
commit 1478d072712329db33d9744ab1856071f52ea907
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 15 22:42:01 2018 -0700
fixing behavior when configs are empty (#1781)
* fixing empty configs
* refactoring code
---
.../java/org/apache/pulsar/admin/cli/CmdFunctions.java | 17 +++++++++++++++--
.../apache/pulsar/functions/instance/ContextImpl.java | 8 ++++++--
.../pulsar/functions/instance/JavaInstanceRunnable.java | 16 +++++++++++++---
.../instance/src/main/python/contextimpl.py | 4 +++-
.../apache/pulsar/functions/utils/FunctionConfig.java | 6 +++---
5 files changed, 40 insertions(+), 11 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 65a1db2..dc311ba 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -306,6 +306,15 @@ public class CmdFunctions extends CmdBase {
Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
functionConfig.setUserConfig(userConfigMap);
}
+ if (functionConfig.getInputs() == null) {
+ functionConfig.setInputs(new LinkedList<>());
+ }
+ if (functionConfig.getCustomSerdeInputs() == null) {
+ functionConfig.setCustomSerdeInputs(new HashMap<>());
+ }
+ if (functionConfig.getUserConfig() == null) {
+ functionConfig.setUserConfig(new HashMap<>());
+ }
if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
throw new RuntimeException("No input topic(s) specified for the function");
@@ -652,11 +661,12 @@ public class CmdFunctions extends CmdBase {
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
// Setup source
+ SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
- SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
+
if (functionConfig.getSubscriptionType() != null) {
sourceSpecBuilder
.setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType()));
@@ -701,6 +711,7 @@ public class CmdFunctions extends CmdBase {
Map<String, Object> configs = new HashMap<>();
configs.putAll(functionConfig.getUserConfig());
+
// windowing related
WindowConfig windowConfig = functionConfig.getWindowConfig();
if (windowConfig != null) {
@@ -714,7 +725,9 @@ public class CmdFunctions extends CmdBase {
functionDetailsBuilder.setClassName(functionConfig.getClassName());
}
}
- functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+ if (!configs.isEmpty()) {
+ functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+ }
functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d1971d0..9eddc69 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -113,8 +113,12 @@ class ContextImpl implements Context {
producerConfiguration.setBatchingEnabled(true);
producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
producerConfiguration.setMaxPendingMessages(1000000);
- userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
- new TypeToken<Map<String, Object>>(){}.getType());
+ if (config.getFunctionDetails().getUserConfig().isEmpty()) {
+ userConfigs = new HashMap<>();
+ } else {
+ userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
+ new TypeToken<Map<String, Object>>(){}.getType());
+ }
}
public void setCurrentMessageContext(MessageId messageId, String topicName) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e712dab..4aaed5b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -490,8 +491,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
this.source = (Source) object;
- this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
- new TypeToken<Map<String, Object>>(){}.getType()));
+ if (sourceSpec.getConfigs().isEmpty()) {
+ this.source.open(new HashMap<>());
+ } else {
+ this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
+ new TypeToken<Map<String, Object>>(){}.getType()));
+ }
}
public void setupOutput() throws Exception {
@@ -526,6 +531,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
- this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()));
+ if (sinkSpec.getConfigs().isEmpty()) {
+ this.sink.open(new HashMap<>());
+ } else {
+ this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
+ new TypeToken<Map<String, Object>>() {}.getType()));
+ }
}
}
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index e17b296..3463d7b 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -60,7 +60,9 @@ class ContextImpl(pulsar.Context):
self.current_message_id = None
self.current_input_topic_name = None
self.current_start_time = None
- self.user_config = json.loads(instance_config.function_details.userConfig);
+ self.user_config = json.loads(instance_config.function_details.userConfig) \
+ if instance_config.function_details.userConfig \
+ else []
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 5895179..ffdda63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -66,15 +66,15 @@ public class FunctionConfig {
private String name;
private String className;
- private Collection<String> inputs = new LinkedList<>();
- private Map<String, String> customSerdeInputs = new HashMap<>();
+ private Collection<String> inputs;
+ private Map<String, String> customSerdeInputs;
private String output;
private String outputSerdeClassName;
private String logTopic;
private ProcessingGuarantees processingGuarantees;
- private Map<String, Object> userConfig = new HashMap<>();
+ private Map<String, Object> userConfig;
private SubscriptionType subscriptionType;
private Runtime runtime;
private boolean autoAck;
--
To stop receiving notification emails like this one, please contact
mmerli@apache.org.