You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:53:00 UTC

[incubator-pulsar] branch master updated: adding update functionality to sources and sinks (#1813)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f97bde  adding update functionality to sources and sinks (#1813)
7f97bde is described below

commit 7f97bde2a384ae00bfbfd1c9f54deb3db124a1ad
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon May 21 00:52:58 2018 -0700

    adding update functionality to sources and sinks (#1813)
---
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java | 41 +++++++++++++++-------
 .../org/apache/pulsar/admin/cli/CmdSources.java    | 39 ++++++++++++++------
 2 files changed, 57 insertions(+), 23 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 04e73bf..ed1cfa7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.Source;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,16 +62,19 @@ import java.util.function.Consumer;
 public class CmdSinks extends CmdBase {
 
     private final CreateSink createSink;
+    private final UpdateSink updateSink;
     private final DeleteSink deleteSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
         super("sink", admin);
         createSink = new CreateSink();
+        updateSink = new UpdateSink();
         deleteSink = new DeleteSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
+        jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("localrun", localSinkRunner);
     }
@@ -108,7 +110,31 @@ public class CmdSinks extends CmdBase {
     }
 
     @Parameters(commandDescription = "Create Pulsar sink connectors")
-    class CreateSink extends BaseCommand {
+    class CreateSink extends SinkCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
+            print("Created successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Update Pulsar sink connectors")
+    class UpdateSink extends SinkCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().updateFunction(createSinkConfig(sinkConfig), jarFile);
+            print("Updated successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Create Pulsar sink connectors")
+    abstract class SinkCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The sink's namespace")
@@ -123,7 +149,7 @@ public class CmdSinks extends CmdBase {
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--parallelism", description = "")
+        @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)")
         protected String parallelism;
         @Parameter(
                 names = "--jar",
@@ -221,15 +247,6 @@ public class CmdSinks extends CmdBase {
             }
         }
 
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
-            print("Created successfully");
-        }
-
         private Class<?> getSinkType(File file) {
             if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) {
                 throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s",
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 1583589..dbcdef7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -64,15 +64,18 @@ public class CmdSources extends CmdBase {
 
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
+    private final UpdateSource updateSource;
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
         super("source", admin);
         createSource = new CreateSource();
+        updateSource = new UpdateSource();
         deleteSource = new DeleteSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
+        jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("localrun", localSourceRunner);
     }
@@ -108,7 +111,30 @@ public class CmdSources extends CmdBase {
     }
 
     @Parameters(commandDescription = "Create Pulsar source connectors")
-    class CreateSource extends BaseCommand {
+    public class CreateSource extends SourceCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
+            print("Created successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Update Pulsar source connectors")
+    public class UpdateSource extends SourceCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().updateFunction(createSourceConfig(sourceConfig), jarFile);
+            print("Updated successfully");
+        }
+    }
+
+    abstract class SourceCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The source's namespace")
@@ -123,7 +149,7 @@ public class CmdSources extends CmdBase {
         protected String destinationTopicName;
         @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
         protected String deserializationClassName;
-        @Parameter(names = "--parallelism", description = "Number of instances of the source")
+        @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)")
         protected String parallelism;
         @Parameter(
                 names = "--jar",
@@ -206,15 +232,6 @@ public class CmdSources extends CmdBase {
             }
         }
 
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
-            print("Created successfully");
-        }
-
         private Class<?> getSourceType(File file) {
             if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
                 throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.