You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/21 07:53:00 UTC

[GitHub] sijie closed pull request #1813: adding update functionality to sources and sinks

sijie closed pull request #1813: adding update functionality to sources and sinks
URL: https://github.com/apache/incubator-pulsar/pull/1813
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 04e73bffb7..ed1cfa720b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -45,7 +45,6 @@
 import org.apache.pulsar.functions.utils.SinkConfig;
 import org.apache.pulsar.functions.utils.Utils;
 import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.Source;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,16 +62,19 @@
 public class CmdSinks extends CmdBase {
 
     private final CreateSink createSink;
+    private final UpdateSink updateSink;
     private final DeleteSink deleteSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
         super("sink", admin);
         createSink = new CreateSink();
+        updateSink = new UpdateSink();
         deleteSink = new DeleteSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
+        jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("localrun", localSinkRunner);
     }
@@ -108,7 +110,31 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Create Pulsar sink connectors")
-    class CreateSink extends BaseCommand {
+    class CreateSink extends SinkCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
+            print("Created successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Update Pulsar sink connectors")
+    class UpdateSink extends SinkCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().updateFunction(createSinkConfig(sinkConfig), jarFile);
+            print("Updated successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Create Pulsar sink connectors")
+    abstract class SinkCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The sink's namespace")
@@ -123,7 +149,7 @@ void runCmd() throws Exception {
         protected String customSerdeInputString;
         @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink")
         protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--parallelism", description = "")
+        @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)")
         protected String parallelism;
         @Parameter(
                 names = "--jar",
@@ -221,15 +247,6 @@ public void accept(String s) {
             }
         }
 
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
-            print("Created successfully");
-        }
-
         private Class<?> getSinkType(File file) {
             if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) {
                 throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s",
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 1583589055..dbcdef7301 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -64,15 +64,18 @@
 
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
+    private final UpdateSource updateSource;
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
         super("source", admin);
         createSource = new CreateSource();
+        updateSource = new UpdateSource();
         deleteSource = new DeleteSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
+        jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("localrun", localSourceRunner);
     }
@@ -108,7 +111,30 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Create Pulsar source connectors")
-    class CreateSource extends BaseCommand {
+    public class CreateSource extends SourceCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
+            print("Created successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Update Pulsar source connectors")
+    public class UpdateSource extends SourceCommand {
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().updateFunction(createSourceConfig(sourceConfig), jarFile);
+            print("Updated successfully");
+        }
+    }
+
+    abstract class SourceCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The source's namespace")
@@ -123,7 +149,7 @@ void runCmd() throws Exception {
         protected String destinationTopicName;
         @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
         protected String deserializationClassName;
-        @Parameter(names = "--parallelism", description = "Number of instances of the source")
+        @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)")
         protected String parallelism;
         @Parameter(
                 names = "--jar",
@@ -206,15 +232,6 @@ void processArguments() throws Exception {
             }
         }
 
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
-            print("Created successfully");
-        }
-
         private Class<?> getSourceType(File file) {
             if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
                 throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services