You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/19 00:49:29 UTC
[pulsar] 14/26: Fix update cli source sink (#4061)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit b4aff17c4e036ff06895dc6b2855c2af0266cac5
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Apr 17 17:42:38 2019 -0700
Fix update cli source sink (#4061)
* fix bug in source and sink cli update
* fix import
* fix logic
* fix tests
---
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 7 +++-
.../org/apache/pulsar/admin/cli/CmdSources.java | 8 ++++-
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 39 ++++++++++++++++++++--
.../apache/pulsar/admin/cli/TestCmdSources.java | 36 ++++++++++++++++++++
.../pulsar/common/functions/FunctionConfig.java | 2 +-
.../org/apache/pulsar/common/io/SinkConfig.java | 2 +-
.../functions/utils/FunctionConfigUtils.java | 9 +++++
.../pulsar/functions/utils/SinkConfigUtils.java | 9 +++++
8 files changed, 105 insertions(+), 7 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 52842c3..cf60e8a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -214,7 +214,12 @@ public class CmdSinks extends CmdBase {
}
protected void validateSinkConfigs(SinkConfig sinkConfig) {
- org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+ if (sinkConfig.getTenant() == null) {
+ sinkConfig.setTenant(PUBLIC_TENANT);
+ }
+ if (sinkConfig.getNamespace() == null) {
+ sinkConfig.setNamespace(DEFAULT_NAMESPACE);
+ }
}
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 3b1927e..01a6b38 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -207,6 +207,7 @@ public class CmdSources extends CmdBase {
@Parameters(commandDescription = "Update a Pulsar IO source connector")
protected class UpdateSource extends SourceDetailsCommand {
+
@Override
void runCmd() throws Exception {
if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -218,7 +219,12 @@ public class CmdSources extends CmdBase {
}
protected void validateSourceConfigs(SourceConfig sourceConfig) {
- org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+ if (sourceConfig.getTenant() == null) {
+ sourceConfig.setTenant(PUBLIC_TENANT);
+ }
+ if (sourceConfig.getNamespace() == null) {
+ sourceConfig.setNamespace(DEFAULT_NAMESPACE);
+ }
}
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 2bb3877..8340795 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -165,7 +165,6 @@ public class TestCmdSinks {
@Test
public void testMissingInput() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
- sinkConfig.setInputSpecs(new HashMap<>());
sinkConfig.setInputs(null);
testCmdSinkCliMissingArgs(
TENANT,
@@ -190,7 +189,6 @@ public class TestCmdSinks {
SinkConfig sinkConfig = getSinkConfig();
sinkConfig.setTopicToSerdeClassName(null);
sinkConfig.setTopicToSchemaType(null);
- sinkConfig.setInputSpecs(new HashMap<>());
testCmdSinkCliMissingArgs(
TENANT,
NAMESPACE,
@@ -212,7 +210,6 @@ public class TestCmdSinks {
@Test
public void testMissingTopicPattern() throws Exception {
SinkConfig sinkConfig = getSinkConfig();
- sinkConfig.getInputSpecs().clear();
sinkConfig.setTopicsPattern(null);
testCmdSinkCliMissingArgs(
TENANT,
@@ -677,4 +674,40 @@ public class TestCmdSinks {
verify(sink).deleteSink(eq(TENANT), eq(NAMESPACE), null);
}
+
+ @Test
+ public void testUpdateSink() throws Exception {
+
+ updateSink.name = "my-sink";
+
+ updateSink.archive = "new-archive";
+
+ updateSink.processArguments();
+
+ updateSink.runCmd();
+
+ verify(sink).updateSink(eq(SinkConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSink.name)
+ .archive(updateSink.archive)
+ .build()), eq(updateSink.archive));
+
+
+ updateSink.archive = null;
+
+ updateSink.parallelism = 2;
+
+ updateSink.processArguments();
+
+ updateSink.runCmd();
+
+ verify(sink).updateSink(eq(SinkConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSink.name)
+ .parallelism(2)
+ .build()), eq(null));
+
+ }
}
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index 77e26a5..57dc3ce 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -570,4 +570,40 @@ public class TestCmdSources {
verify(source).deleteSource(eq(TENANT), eq(NAMESPACE), null);
}
+
+ @Test
+ public void testUpdateSource() throws Exception {
+
+ updateSource.name = "my-source";
+
+ updateSource.archive = "new-archive";
+
+ updateSource.processArguments();
+
+ updateSource.runCmd();
+
+ verify(source).updateSource(eq(SourceConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSource.name)
+ .archive(updateSource.archive)
+ .build()), eq(updateSource.archive));
+
+
+ updateSource.archive = null;
+
+ updateSource.parallelism = 2;
+
+ updateSource.processArguments();
+
+ updateSource.runCmd();
+
+ verify(source).updateSource(eq(SourceConfig.builder()
+ .tenant(PUBLIC_TENANT)
+ .namespace(DEFAULT_NAMESPACE)
+ .name(updateSource.name)
+ .parallelism(2)
+ .build()), eq(null));
+
+ }
}
\ No newline at end of file
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
index dc9023a..2b8883d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/FunctionConfig.java
@@ -64,7 +64,7 @@ public class FunctionConfig {
/**
* A generalized way of specifying inputs
*/
- private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+ private Map<String, ConsumerConfig> inputSpecs;
private String output;
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
index d6dd92c..6e51cbb 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/io/SinkConfig.java
@@ -51,7 +51,7 @@ public class SinkConfig {
private Map<String, String> topicToSchemaType;
- private Map<String, ConsumerConfig> inputSpecs = new TreeMap<>();
+ private Map<String, ConsumerConfig> inputSpecs;
private Map<String, Object> configs;
// This is a map of secretName(aka how the secret is going to be
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 05d6c83..c67de53 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -611,6 +611,15 @@ public class FunctionConfigUtils {
if (!StringUtils.isEmpty(newConfig.getClassName())) {
mergedConfig.setClassName(newConfig.getClassName());
}
+
+ if (newConfig.getInputSpecs() == null) {
+ newConfig.setInputSpecs(new HashMap<>());
+ }
+
+ if (mergedConfig.getInputSpecs() == null) {
+ mergedConfig.setInputSpecs(new HashMap<>());
+ }
+
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index f05a5f9..f1d7815 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -413,6 +413,15 @@ public class SinkConfigUtils {
if (!StringUtils.isEmpty(newConfig.getSourceSubscriptionName()) && !newConfig.getSourceSubscriptionName().equals(existingConfig.getSourceSubscriptionName())) {
throw new IllegalArgumentException("Subscription Name cannot be altered");
}
+
+ if (newConfig.getInputSpecs() == null) {
+ newConfig.setInputSpecs(new HashMap<>());
+ }
+
+ if (mergedConfig.getInputSpecs() == null) {
+ mergedConfig.setInputSpecs(new HashMap<>());
+ }
+
if (newConfig.getInputs() != null) {
newConfig.getInputs().forEach((topicName -> {
newConfig.getInputSpecs().put(topicName,