You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2018/05/16 05:42:05 UTC

[incubator-pulsar] branch master updated: fixing behavior when configs are empty (#1781)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 1478d07  fixing behavior when configs are empty (#1781)
1478d07 is described below

commit 1478d072712329db33d9744ab1856071f52ea907
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Tue May 15 22:42:01 2018 -0700

    fixing behavior when configs are empty (#1781)
    
    * fixing empty configs
    
    * refactoring code
---
 .../java/org/apache/pulsar/admin/cli/CmdFunctions.java  | 17 +++++++++++++++--
 .../apache/pulsar/functions/instance/ContextImpl.java   |  8 ++++++--
 .../pulsar/functions/instance/JavaInstanceRunnable.java | 16 +++++++++++++---
 .../instance/src/main/python/contextimpl.py             |  4 +++-
 .../apache/pulsar/functions/utils/FunctionConfig.java   |  6 +++---
 5 files changed, 40 insertions(+), 11 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 65a1db2..dc311ba 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -306,6 +306,15 @@ public class CmdFunctions extends CmdBase {
                 Map<String, Object> userConfigMap = new Gson().fromJson(userConfigString, type);
                 functionConfig.setUserConfig(userConfigMap);
             }
+            if (functionConfig.getInputs() == null) {
+                functionConfig.setInputs(new LinkedList<>());
+            }
+            if (functionConfig.getCustomSerdeInputs() == null) {
+                functionConfig.setCustomSerdeInputs(new HashMap<>());
+            }
+            if (functionConfig.getUserConfig() == null) {
+                functionConfig.setUserConfig(new HashMap<>());
+            }
 
             if (functionConfig.getInputs().isEmpty() && functionConfig.getCustomSerdeInputs().isEmpty()) {
                 throw new RuntimeException("No input topic(s) specified for the function");
@@ -652,11 +661,12 @@ public class CmdFunctions extends CmdBase {
             FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
             // Setup source
+            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
             Map<String, String> topicToSerDeClassNameMap = new HashMap<>();
             topicToSerDeClassNameMap.putAll(functionConfig.getCustomSerdeInputs());
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
             functionConfig.getInputs().forEach(v -> topicToSerDeClassNameMap.put(v, ""));
             sourceSpecBuilder.putAllTopicsToSerDeClassName(topicToSerDeClassNameMap);
+
             if (functionConfig.getSubscriptionType() != null) {
                 sourceSpecBuilder
                         .setSubscriptionType(convertSubscriptionType(functionConfig.getSubscriptionType()));
@@ -701,6 +711,7 @@ public class CmdFunctions extends CmdBase {
 
             Map<String, Object> configs = new HashMap<>();
             configs.putAll(functionConfig.getUserConfig());
+
             // windowing related
             WindowConfig windowConfig = functionConfig.getWindowConfig();
             if (windowConfig != null) {
@@ -714,7 +725,9 @@ public class CmdFunctions extends CmdBase {
                     functionDetailsBuilder.setClassName(functionConfig.getClassName());
                 }
             }
-            functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+            if (!configs.isEmpty()) {
+                functionDetailsBuilder.setUserConfig(new Gson().toJson(configs));
+            }
 
             functionDetailsBuilder.setAutoAck(functionConfig.isAutoAck());
             functionDetailsBuilder.setParallelism(functionConfig.getParallelism());
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index d1971d0..9eddc69 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -113,8 +113,12 @@ class ContextImpl implements Context {
         producerConfiguration.setBatchingEnabled(true);
         producerConfiguration.setBatchingMaxPublishDelay(1, TimeUnit.MILLISECONDS);
         producerConfiguration.setMaxPendingMessages(1000000);
-        userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
-                new TypeToken<Map<String, Object>>(){}.getType());
+        if (config.getFunctionDetails().getUserConfig().isEmpty()) {
+            userConfigs = new HashMap<>();
+        } else {
+            userConfigs = new Gson().fromJson(config.getFunctionDetails().getUserConfig(),
+                    new TypeToken<Map<String, Object>>(){}.getType());
+        }
     }
 
     public void setCurrentMessageContext(MessageId messageId, String topicName) {
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index e712dab..4aaed5b 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -28,6 +28,7 @@ import io.netty.buffer.ByteBuf;
 
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -490,8 +491,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
         this.source = (Source) object;
 
-        this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
-                new TypeToken<Map<String, Object>>(){}.getType()));
+        if (sourceSpec.getConfigs().isEmpty()) {
+            this.source.open(new HashMap<>());
+        } else {
+            this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
+                    new TypeToken<Map<String, Object>>(){}.getType()));
+        }
     }
 
     public void setupOutput() throws Exception {
@@ -526,6 +531,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         } else {
             throw new RuntimeException("Sink does not implement correct interface");
         }
-        this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(), new TypeToken<Map<String, Object>>(){}.getType()));
+        if (sinkSpec.getConfigs().isEmpty()) {
+            this.sink.open(new HashMap<>());
+        } else {
+            this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
+                    new TypeToken<Map<String, Object>>() {}.getType()));
+        }
     }
 }
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index e17b296..3463d7b 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -60,7 +60,9 @@ class ContextImpl(pulsar.Context):
     self.current_message_id = None
     self.current_input_topic_name = None
     self.current_start_time = None
-    self.user_config = json.loads(instance_config.function_details.userConfig);
+    self.user_config = json.loads(instance_config.function_details.userConfig) \
+      if instance_config.function_details.userConfig \
+      else []
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
index 5895179..ffdda63 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfig.java
@@ -66,15 +66,15 @@ public class FunctionConfig {
     private String name;
     private String className;
 
-    private Collection<String> inputs = new LinkedList<>();
-    private Map<String, String> customSerdeInputs = new HashMap<>();
+    private Collection<String> inputs;
+    private Map<String, String> customSerdeInputs;
 
     private String output;
     private String outputSerdeClassName;
 
     private String logTopic;
     private ProcessingGuarantees processingGuarantees;
-    private Map<String, Object> userConfig = new HashMap<>();
+    private Map<String, Object> userConfig;
     private SubscriptionType subscriptionType;
     private Runtime runtime;
     private boolean autoAck;

-- 
To stop receiving notification emails like this one, please contact
mmerli@apache.org.