You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:29 UTC

[pulsar] 14/26: Fix update cli source sink (#4061)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit b4aff17c4e036ff06895dc6b2855c2af0266cac5
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Apr 17 17:42:38 2019 -0700

    Fix update cli source sink (#4061)
    
    * fix bug in source and sink cli update
    
    * fix import
    
    * fix logic
    
    * fix tests
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  7 +++-
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  8 ++++-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  | 39 ++++++++++++++++++++--
 .../apache/pulsar/admin/cli/TestCmdSources.java    | 36 ++++++++++++++++++++
 .../pulsar/common/functions/FunctionConfig.java    |  2 +-
 .../org/apache/pulsar/common/io/SinkConfig.java    |  2 +-
 .../functions/utils/FunctionConfigUtils.java       |  9 +++++
 .../pulsar/functions/utils/SinkConfigUtils.java    |  9 +++++
 8 files changed, 105 insertions(+), 7 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 52842c3..cf60e8a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -214,7 +214,12 @@ public class CmdSinks extends CmdBase {
         }
 
         protected void validateSinkConfigs(SinkConfig sinkConfig) {
-            org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+            if (sinkConfig.getTenant() == null) {
+                sinkConfig.setTenant(PUBLIC_TENANT);
+            }
+            if (sinkConfig.getNamespace() == null) {
+                sinkConfig.setNamespace(DEFAULT_NAMESPACE);
+            }
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 3b1927e..01a6b38 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -207,6 +207,7 @@ public class CmdSources extends CmdBase {
 
     @Parameters(commandDescription = "Update a Pulsar IO source connector")
     protected class UpdateSource extends SourceDetailsCommand {
+
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -218,7 +219,12 @@ public class CmdSources extends CmdBase {
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
-            org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+            if (sourceConfig.getTenant() == null) {
+                sourceConfig.setTenant(PUBLIC_TENANT);
+            }
+            if (sourceConfig.getNamespace() == null) {
+                sourceConfig.setNamespace(DEFAULT_NAMESPACE);
+            }
         }
     }
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2bb3877..8340795 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -165,7 +165,6 @@ public class TestCmdSinks {
     @Test
     public void testMissingInput() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.setInputSpecs(new HashMap<>());
         sinkConfig.setInputs(null);
         testCmdSinkCliMissingArgs(
                 TENANT,
@@ -190,7 +189,6 @@ public class TestCmdSinks {
         SinkConfig sinkConfig = getSinkConfig();
         sinkConfig.setTopicToSerdeClassName(null);
         sinkConfig.setTopicToSchemaType(null);
-        sinkConfig.setInputSpecs(new HashMap<>());
         testCmdSinkCliMissingArgs(
                 TENANT,
                 NAMESPACE,
@@ -212,7 +210,6 @@ public class TestCmdSinks {
     @Test
     public void testMissingTopicPattern() throws Exception {
         SinkConfig sinkConfig = getSinkConfig();
-        sinkConfig.getInputSpecs().clear();
         sinkConfig.setTopicsPattern(null);
         testCmdSinkCliMissingArgs(
                 TENANT,
@@ -677,4 +674,40 @@ public class TestCmdSinks {
 
         verify(sink).deleteSink(eq(TENANT), eq(NAMESPACE), null);
     }
+
+    @Test
+    public void testUpdateSink() throws Exception {
+
+        updateSink.name = "my-sink";
+
+        updateSink.archive = "new-archive";
+
+        updateSink.processArguments();
+
+        updateSink.runCmd();
+
+        verify(sink).updateSink(eq(SinkConfig.builder()
+                .tenant(PUBLIC_TENANT)
+                .namespace(DEFAULT_NAMESPACE)
+                .name(updateSink.name)
+                .archive(updateSink.archive)
+                .build()), eq(updateSink.archive));
+
+
+        updateSink.archive = null;
+
+        updateSink.parallelism = 2;
+
+        updateSink.processArguments();
+
+        updateSink.runCmd();
+
+        verify(sink).updateSink(eq(SinkConfig.builder()
+                .tenant(PUBLIC_TENANT)
+                .namespace(DEFAULT_NAMESPACE)
+                .name(updateSink.name)
+                .parallelism(2)
+                .build()), eq(null));
+
+    }
 }
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 77e26a5..57dc3ce 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -570,4 +570,40 @@ public class TestCmdSources {
 
         verify(source).deleteSource(eq(TENANT), eq(NAMESPACE), null);
     }
+
+    @Test
+    public void testUpdateSource() throws Exception {
+
+        updateSource.name = "my-source";
+
+        updateSource.archive = "new-archive";
+
+        updateSource.processArguments();
+
+        updateSource.runCmd();
+
+        verify(source).updateSource(eq(SourceConfig.builder()
+                .tenant(PUBLIC_TENANT)
+                .namespace(DEFAULT_NAMESPACE)
+                .name(updateSource.name)
+                .archive(updateSource.archive)
+                .build()), eq(updateSource.archive));
+
+
+        updateSource.archive = null;
+
+        updateSource.parallelism = 2;
+
+        updateSource.processArguments();
+
+        updateSource.runCmd();
+
+        verify(source).updateSource(eq(SourceConfig.builder()
+                .tenant(PUBLIC_TENANT)
+                .namespace(DEFAULT_NAMESPACE)
+                .name(updateSource.name)
+                .parallelism(2)
+                .build()), eq(null));
+
+    }
 }
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index dc9023a..2b8883d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -64,7 +64,7 @@ public class FunctionConfig {
     /**
      * A generalized way of specifying inputs
      */
-    private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+    private Map<String, ConsumerConfig> inputSpecs;
 
     private String output;
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index d6dd92c..6e51cbb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -51,7 +51,7 @@ public class SinkConfig {
 
     private Map<String, String> topicToSchemaType;
 
-    private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+    private Map<String, ConsumerConfig> inputSpecs;
 
     private Map<String, Object> configs;
     // This is a map of secretName(aka how the secret is going to be
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 05d6c83..c67de53 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -611,6 +611,15 @@ public class FunctionConfigUtils {
         if (!StringUtils.isEmpty(newConfig.getClassName())) {
             mergedConfig.setClassName(newConfig.getClassName());
         }
+
+        if (newConfig.getInputSpecs() == null) {
+            newConfig.setInputSpecs(new HashMap<>());
+        }
+
+        if (mergedConfig.getInputSpecs() == null) {
+            mergedConfig.setInputSpecs(new HashMap<>());
+        }
+        
         if (newConfig.getInputs() != null) {
             newConfig.getInputs().forEach((topicName -> {
                 newConfig.getInputSpecs().put(topicName,
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index f05a5f9..f1d7815 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -413,6 +413,15 @@ public class SinkConfigUtils {
         if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && !newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName())) {
             throw new IllegalArgumentException("Subscription Name cannot be altered");
         }
+
+        if (newConfig.getInputSpecs() == null) {
+            newConfig.setInputSpecs(new HashMap<>());
+        }
+
+        if (mergedConfig.getInputSpecs() == null) {
+            mergedConfig.setInputSpecs(new HashMap<>());
+        }
+
         if (newConfig.getInputs() != null) {
             newConfig.getInputs().forEach((topicName -> {
                 newConfig.getInputSpecs().put(topicName,