You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/21 07:53:00 UTC
[incubator-pulsar] branch master updated: adding update
functionality to sources and sinks (#1813)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7f97bde adding update functionality to sources and sinks (#1813)
7f97bde is described below
commit 7f97bde2a384ae00bfbfd1c9f54deb3db124a1ad
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Mon May 21 00:52:58 2018 -0700
adding update functionality to sources and sinks (#1813)
---
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 41 +++++++++++++++-------
.../org/apache/pulsar/admin/cli/CmdSources.java | 39 ++++++++++++++------
2 files changed, 57 insertions(+), 23 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 04e73bf..ed1cfa7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -45,7 +45,6 @@ import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.SinkConfig;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.io.core.Sink;
-import org.apache.pulsar.io.core.Source;
import java.io.File;
import java.io.IOException;
@@ -63,16 +62,19 @@ import java.util.function.Consumer;
public class CmdSinks extends CmdBase {
private final CreateSink createSink;
+ private final UpdateSink updateSink;
private final DeleteSink deleteSink;
private final LocalSinkRunner localSinkRunner;
public CmdSinks(PulsarAdmin admin) {
super("sink", admin);
createSink = new CreateSink();
+ updateSink = new UpdateSink();
deleteSink = new DeleteSink();
localSinkRunner = new LocalSinkRunner();
jcommander.addCommand("create", createSink);
+ jcommander.addCommand("update", updateSink);
jcommander.addCommand("delete", deleteSink);
jcommander.addCommand("localrun", localSinkRunner);
}
@@ -108,7 +110,31 @@ public class CmdSinks extends CmdBase {
}
@Parameters(commandDescription = "Create Pulsar sink connectors")
- class CreateSink extends BaseCommand {
+ class CreateSink extends SinkCommand {
+ @Override
+ void runCmd() throws Exception {
+ if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+ throw new RuntimeException("Missing arguments");
+ }
+ admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
+ print("Created successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Update Pulsar sink connectors")
+ class UpdateSink extends SinkCommand {
+ @Override
+ void runCmd() throws Exception {
+ if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
+ throw new RuntimeException("Missing arguments");
+ }
+ admin.functions().updateFunction(createSinkConfig(sinkConfig), jarFile);
+ print("Updated successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Create Pulsar sink connectors")
+ abstract class SinkCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "The sink's tenant")
protected String tenant;
@Parameter(names = "--namespace", description = "The sink's namespace")
@@ -123,7 +149,7 @@ public class CmdSinks extends CmdBase {
protected String customSerdeInputString;
@Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink")
protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--parallelism", description = "")
+ @Parameter(names = "--parallelism", description = "The sink's parallelism factor (i.e. the number of sink instances to run)")
protected String parallelism;
@Parameter(
names = "--jar",
@@ -221,15 +247,6 @@ public class CmdSinks extends CmdBase {
}
}
- @Override
- void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
- throw new RuntimeException("Missing arguments");
- }
- admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
- print("Created successfully");
- }
-
private Class<?> getSinkType(File file) {
if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s",
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 1583589..dbcdef7 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -64,15 +64,18 @@ public class CmdSources extends CmdBase {
private final CreateSource createSource;
private final DeleteSource deleteSource;
+ private final UpdateSource updateSource;
private final LocalSourceRunner localSourceRunner;
public CmdSources(PulsarAdmin admin) {
super("source", admin);
createSource = new CreateSource();
+ updateSource = new UpdateSource();
deleteSource = new DeleteSource();
localSourceRunner = new LocalSourceRunner();
jcommander.addCommand("create", createSource);
+ jcommander.addCommand("update", updateSource);
jcommander.addCommand("delete", deleteSource);
jcommander.addCommand("localrun", localSourceRunner);
}
@@ -108,7 +111,30 @@ public class CmdSources extends CmdBase {
}
@Parameters(commandDescription = "Create Pulsar source connectors")
- class CreateSource extends BaseCommand {
+ public class CreateSource extends SourceCommand {
+ @Override
+ void runCmd() throws Exception {
+ if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+ throw new RuntimeException("Missing arguments");
+ }
+ admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
+ print("Created successfully");
+ }
+ }
+
+ @Parameters(commandDescription = "Update Pulsar source connectors")
+ public class UpdateSource extends SourceCommand {
+ @Override
+ void runCmd() throws Exception {
+ if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+ throw new RuntimeException("Missing arguments");
+ }
+ admin.functions().updateFunction(createSourceConfig(sourceConfig), jarFile);
+ print("Updated successfully");
+ }
+ }
+
+ abstract class SourceCommand extends BaseCommand {
@Parameter(names = "--tenant", description = "The source's tenant")
protected String tenant;
@Parameter(names = "--namespace", description = "The source's namespace")
@@ -123,7 +149,7 @@ public class CmdSources extends CmdBase {
protected String destinationTopicName;
@Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
protected String deserializationClassName;
- @Parameter(names = "--parallelism", description = "Number of instances of the source")
+ @Parameter(names = "--parallelism", description = "The source's parallelism factor (i.e. the number of source instances to run)")
protected String parallelism;
@Parameter(
names = "--jar",
@@ -206,15 +232,6 @@ public class CmdSources extends CmdBase {
}
}
- @Override
- void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
- throw new RuntimeException("Missing arguments");
- }
- admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
- print("Created successfully");
- }
-
private Class<?> getSourceType(File file) {
if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.