You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/08 09:05:35 UTC
[incubator-pulsar] branch master updated: Seperate cmd line
connector interface to explicit source and sink (#1745)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 17a7d91 Seperate cmd line connector interface to explicit source and sink (#1745)
17a7d91 is described below
commit 17a7d91ab5b73cfbce1ae775dfc6312e88896287
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue May 8 02:05:33 2018 -0700
Seperate cmd line connector interface to explicit source and sink (#1745)
---
.../cli/{CmdConnectors.java => CmdSinks.java} | 245 ++------------------
.../cli/{CmdConnectors.java => CmdSources.java} | 257 ++-------------------
.../apache/pulsar/admin/cli/PulsarAdminTool.java | 3 +-
3 files changed, 39 insertions(+), 466 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
similarity index 57%
copy from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
copy to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0492d76..01ec16c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,7 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.shaded.proto.Function;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
import java.io.File;
import java.io.IOException;
@@ -56,32 +53,24 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
-import net.jodah.typetools.TypeResolver;
-
@Slf4j
@Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress data from Pulsar)")
+public class CmdSinks extends CmdBase {
- private final CreateSource createSource;
private final CreateSink createSink;
- private final DeleteConnector deleteConnector;
- private final LocalSourceRunner localSourceRunner;
+ private final DeleteSink deleteSink;
private final LocalSinkRunner localSinkRunner;
- public CmdConnectors(PulsarAdmin admin) {
- super("connectors", admin);
- createSource = new CreateSource();
+ public CmdSinks(PulsarAdmin admin) {
+ super("sink", admin);
createSink = new CreateSink();
- deleteConnector = new DeleteConnector();
- localSourceRunner = new LocalSourceRunner();
+ deleteSink = new DeleteSink();
localSinkRunner = new LocalSinkRunner();
- jcommander.addCommand("create-source", createSource);
- jcommander.addCommand("create-sink", createSink);
- jcommander.addCommand("delete", deleteConnector);
- jcommander.addCommand("localrun-source", localSourceRunner);
- jcommander.addCommand("localrun-sink", localSinkRunner);
+ jcommander.addCommand("create", createSink);
+ jcommander.addCommand("delete", deleteSink);
+ jcommander.addCommand("localrun", localSinkRunner);
}
/**
@@ -101,20 +90,7 @@ public class CmdConnectors extends CmdBase {
abstract void runCmd() throws Exception;
}
- @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)")
-
- class LocalSourceRunner extends CreateSource {
-
- @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
- protected String brokerServiceUrl;
-
- @Override
- void runCmd() throws Exception {
- CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
- sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
- }
- }
-
+ @Parameters(commandDescription = "Run the Pulsar sink locally (rather than deploying it to the Pulsar cluster)")
class LocalSinkRunner extends CreateSink {
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@@ -127,173 +103,6 @@ public class CmdConnectors extends CmdBase {
}
}
- @Parameters(commandDescription = "Create Pulsar source connectors")
- class CreateSource extends BaseCommand {
- @Parameter(names = "--tenant", description = "The source's tenant")
- protected String tenant;
- @Parameter(names = "--namespace", description = "The source's namespace")
- protected String namespace;
- @Parameter(names = "--name", description = "The source's name")
- protected String name;
- @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source")
- protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--className", description = "The source's class name")
- protected String className;
- @Parameter(names = "--destinationTopicName", description = "Pulsar topic to ingress data to")
- protected String destinationTopicName;
- @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
- protected String deserializationClassName;
- @Parameter(names = "--parallelism", description = "Number of instances of the source")
- protected String parallelism;
- @Parameter(
- names = "--jar",
- description = "Path to the jar file for the Source",
- listConverter = StringConverter.class)
- protected String jarFile;
-
- @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the "
- + "source's configuration")
- protected String sourceConfigFile;
-
- protected SourceConfig sourceConfig;
-
- @Override
- void processArguments() throws Exception {
- super.processArguments();
-
- if (null != sourceConfigFile) {
- this.sourceConfig = loadSourceConfig(sourceConfigFile);
- } else {
- this.sourceConfig = new SourceConfig();
- }
-
- if (null != tenant) {
- sourceConfig.setTenant(tenant);
- }
- if (null != namespace) {
- sourceConfig.setNamespace(namespace);
- }
- if (null != name) {
- sourceConfig.setName(name);
- }
-
- if (null != className) {
- this.sourceConfig.setClassName(className);
- }
- if (null != destinationTopicName) {
- sourceConfig.setTopicName(destinationTopicName);
- }
- if (null != deserializationClassName) {
- sourceConfig.setSerdeClassName(deserializationClassName);
- }
- if (null != processingGuarantees) {
- sourceConfig.setProcessingGuarantees(processingGuarantees);
- }
- if (parallelism == null) {
- if (sourceConfig.getParallelism() == 0) {
- sourceConfig.setParallelism(1);
- }
- } else {
- int num = Integer.parseInt(parallelism);
- if (num <= 0) {
- throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
- + "connector must be positive");
- }
- sourceConfig.setParallelism(num);
- }
-
- if (null == jarFile) {
- throw new IllegalArgumentException("Connector JAR not specfied");
- }
- }
-
- @Override
- void runCmd() throws Exception {
- if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
- throw new RuntimeException("Missing arguments");
- }
- admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
- print("Created successfully");
- }
-
- private Class<?> getSourceType(File file) {
- if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
- throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",
- sourceConfig.getClassName(), jarFile));
- } else if (!Reflections.classInJarImplementsIface(file, sourceConfig.getClassName(), Source.class)) {
- throw new IllegalArgumentException(String.format("The Pulsar source class %s in jar %s implements does not implement " + Source.class.getName(),
- sourceConfig.getClassName(), jarFile));
- }
-
- Object userClass = Reflections.createInstance(sourceConfig.getClassName(), file);
- Class<?> typeArg;
- Source source = (Source) userClass;
- if (source == null) {
- throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated from jar %s",
- sourceConfig.getClassName(), jarFile));
- }
- typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass());
-
- return typeArg;
- }
-
- protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig)
- throws IOException {
- org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
- = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
- Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder);
- return functionDetailsBuilder.build();
- }
-
- protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) {
-
- File file = new File(jarFile);
- try {
- Reflections.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
- }
- Class<?> typeArg = getSourceType(file);
-
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- if (sourceConfig.getTenant() != null) {
- functionDetailsBuilder.setTenant(sourceConfig.getTenant());
- }
- if (sourceConfig.getNamespace() != null) {
- functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
- }
- if (sourceConfig.getName() != null) {
- functionDetailsBuilder.setName(sourceConfig.getName());
- }
- functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
- functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
- functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
- if (sourceConfig.getProcessingGuarantees() != null) {
- functionDetailsBuilder.setProcessingGuarantees(
- convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
- }
-
- // set source spec
- SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
- sourceSpecBuilder.setClassName(sourceConfig.getClassName());
- sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
- sourceSpecBuilder.setTypeClassName(typeArg.getName());
- functionDetailsBuilder.setSource(sourceSpecBuilder);
-
- // set up sink spec
- SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
- sinkSpecBuilder.setClassName(PulsarSink.class.getName());
- if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
- sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
- }
- sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
- sinkSpecBuilder.setTypeClassName(typeArg.getName());
-
- functionDetailsBuilder.setSink(sinkSpecBuilder);
- return functionDetailsBuilder.build();
- }
- }
-
@Parameters(commandDescription = "Create Pulsar sink connectors")
class CreateSink extends BaseCommand {
@Parameter(names = "--tenant", description = "The sink's tenant")
@@ -356,7 +165,7 @@ public class CmdConnectors extends CmdBase {
inputTopics.forEach(new Consumer<String>() {
@Override
public void accept(String s) {
- CmdConnectors.validateTopicName(s);
+ CmdSinks.validateTopicName(s);
topicsToSerDeClassName.put(s, "");
}
});
@@ -365,7 +174,7 @@ public class CmdConnectors extends CmdBase {
Type type = new TypeToken<Map<String, String>>(){}.getType();
Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
customSerdeInputMap.forEach((topic, serde) -> {
- CmdConnectors.validateTopicName(topic);
+ CmdSinks.validateTopicName(topic);
topicsToSerDeClassName.put(topic, serde);
});
}
@@ -475,15 +284,15 @@ public class CmdConnectors extends CmdBase {
}
@Parameters(commandDescription = "Stops a Pulsar sink or source")
- class DeleteConnector extends BaseCommand {
+ class DeleteSink extends BaseCommand {
- @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+ @Parameter(names = "--tenant", description = "The tenant of the sink")
protected String tenant;
- @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+ @Parameter(names = "--namespace", description = "The namespace of the sink")
protected String namespace;
- @Parameter(names = "--name", description = "The name of a sink or source")
+ @Parameter(names = "--name", description = "The name of the sink")
protected String name;
@Override
@@ -491,7 +300,7 @@ public class CmdConnectors extends CmdBase {
super.processArguments();
if (null == tenant || null == namespace || null == name) {
throw new RuntimeException(
- "You must specify a tenant, namespace, and name for the sink or source");
+ "You must specify a tenant, namespace, and name for the sink");
}
}
@@ -506,25 +315,11 @@ public class CmdConnectors extends CmdBase {
return (SinkConfig) loadConfig(file, SinkConfig.class);
}
- private static SourceConfig loadSourceConfig(String file) throws IOException {
- return (SourceConfig) loadConfig(file, SourceConfig.class);
- }
-
private static Object loadConfig(String file, Class<?> clazz) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(file), clazz);
}
- public static boolean areAllRequiredFieldsPresentForSource(SourceConfig sourceConfig) {
- return sourceConfig.getTenant() != null && !sourceConfig.getTenant().isEmpty()
- && sourceConfig.getNamespace() != null && !sourceConfig.getNamespace().isEmpty()
- && sourceConfig.getName() != null && !sourceConfig.getName().isEmpty()
- && sourceConfig.getClassName() != null && !sourceConfig.getClassName().isEmpty()
- && sourceConfig.getTopicName() != null && !sourceConfig.getTopicName().isEmpty()
- || sourceConfig.getSerdeClassName() != null
- && sourceConfig.getParallelism() > 0;
- }
-
public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) {
return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty()
&& sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty()
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
similarity index 54%
rename from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
rename to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 0492d76..c651e37 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -27,6 +27,7 @@ import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.internal.FunctionsImpl;
import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.shaded.proto.Function;
import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
import java.io.File;
import java.io.IOException;
@@ -56,32 +53,24 @@ import java.util.List;
import java.util.Map;
import java.util.function.Consumer;
-import net.jodah.typetools.TypeResolver;
-
@Slf4j
@Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Source (Ingress data to Pulsar)")
+public class CmdSources extends CmdBase {
private final CreateSource createSource;
- private final CreateSink createSink;
- private final DeleteConnector deleteConnector;
+ private final DeleteSource deleteSource;
private final LocalSourceRunner localSourceRunner;
- private final LocalSinkRunner localSinkRunner;
- public CmdConnectors(PulsarAdmin admin) {
- super("connectors", admin);
+ public CmdSources(PulsarAdmin admin) {
+ super("source", admin);
createSource = new CreateSource();
- createSink = new CreateSink();
- deleteConnector = new DeleteConnector();
+ deleteSource = new DeleteSource();
localSourceRunner = new LocalSourceRunner();
- localSinkRunner = new LocalSinkRunner();
- jcommander.addCommand("create-source", createSource);
- jcommander.addCommand("create-sink", createSink);
- jcommander.addCommand("delete", deleteConnector);
- jcommander.addCommand("localrun-source", localSourceRunner);
- jcommander.addCommand("localrun-sink", localSinkRunner);
+ jcommander.addCommand("create", createSource);
+ jcommander.addCommand("delete", deleteSource);
+ jcommander.addCommand("localrun", localSourceRunner);
}
/**
@@ -101,8 +90,7 @@ public class CmdConnectors extends CmdBase {
abstract void runCmd() throws Exception;
}
- @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)")
-
+ @Parameters(commandDescription = "Run the Pulsar source locally (rather than deploying it to the Pulsar cluster)")
class LocalSourceRunner extends CreateSource {
@Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@@ -115,18 +103,6 @@ public class CmdConnectors extends CmdBase {
}
}
- class LocalSinkRunner extends CreateSink {
-
- @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
- protected String brokerServiceUrl;
-
- @Override
- void runCmd() throws Exception {
- CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
- sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
- }
- }
-
@Parameters(commandDescription = "Create Pulsar source connectors")
class CreateSource extends BaseCommand {
@Parameter(names = "--tenant", description = "The source's tenant")
@@ -294,188 +270,8 @@ public class CmdConnectors extends CmdBase {
}
}
- @Parameters(commandDescription = "Create Pulsar sink connectors")
- class CreateSink extends BaseCommand {
- @Parameter(names = "--tenant", description = "The sink's tenant")
- protected String tenant;
- @Parameter(names = "--namespace", description = "The sink's namespace")
- protected String namespace;
- @Parameter(names = "--name", description = "The sink's name")
- protected String name;
- @Parameter(names = "--className", description = "The sink's class name")
- protected String className;
- @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)")
- protected String inputs;
- @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)")
- protected String customSerdeInputString;
- @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink")
- protected FunctionConfig.ProcessingGuarantees processingGuarantees;
- @Parameter(names = "--parallelism", description = "")
- protected String parallelism;
- @Parameter(
- names = "--jar",
- description = "Path to the jar file for the sink",
- listConverter = StringConverter.class)
- protected String jarFile;
-
- @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the "
- + "sink's configuration")
- protected String sinkConfigFile;
-
- protected SinkConfig sinkConfig;
-
- @Override
- void processArguments() throws Exception {
- super.processArguments();
-
- if (null != sinkConfigFile) {
- this.sinkConfig = loadSinkConfig(sinkConfigFile);
- } else {
- this.sinkConfig = new SinkConfig();
- }
-
- if (null != tenant) {
- sinkConfig.setTenant(tenant);
- }
- if (null != namespace) {
- sinkConfig.setNamespace(namespace);
- }
- if (null != name) {
- sinkConfig.setName(name);
- }
-
- if (null != className) {
- sinkConfig.setClassName(className);
- }
- if (null != processingGuarantees) {
- sinkConfig.setProcessingGuarantees(processingGuarantees);
- }
- Map<String, String> topicsToSerDeClassName = new HashMap<>();
- if (null != inputs) {
- List<String> inputTopics = Arrays.asList(inputs.split(","));
- inputTopics.forEach(new Consumer<String>() {
- @Override
- public void accept(String s) {
- CmdConnectors.validateTopicName(s);
- topicsToSerDeClassName.put(s, "");
- }
- });
- }
- if (null != customSerdeInputString) {
- Type type = new TypeToken<Map<String, String>>(){}.getType();
- Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
- customSerdeInputMap.forEach((topic, serde) -> {
- CmdConnectors.validateTopicName(topic);
- topicsToSerDeClassName.put(topic, serde);
- });
- }
- sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-
- if (parallelism == null) {
- if (sinkConfig.getParallelism() == 0) {
- sinkConfig.setParallelism(1);
- }
- } else {
- int num = Integer.parseInt(parallelism);
- if (num <= 0) {
- throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
- + "connector must be positive");
- }
- sinkConfig.setParallelism(num);
- }
-
- if (null == jarFile) {
- throw new IllegalArgumentException("Connector JAR not specfied");
- }
- }
-
- @Override
- void runCmd() throws Exception {
- log.info("sinkConfig: {}", sinkConfig);
- if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
- throw new RuntimeException("Missing arguments");
- }
- admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
- print("Created successfully");
- }
-
- private Class<?> getSinkType(File file) {
- if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) {
- throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s",
- sinkConfig.getClassName(), jarFile));
- } else if (!Reflections.classInJarImplementsIface(file, sinkConfig.getClassName(), Sink.class)) {
- throw new IllegalArgumentException(String.format("The Pulsar sink class %s in jar %s implements " + Sink.class.getName(),
- sinkConfig.getClassName(), jarFile));
- }
-
- Object userClass = Reflections.createInstance(sinkConfig.getClassName(), file);
- Class<?> typeArg;
- Sink sink = (Sink) userClass;
- if (sink == null) {
- throw new IllegalArgumentException(String.format("The Pulsar sink class %s could not be instantiated from jar %s",
- sinkConfig.getClassName(), jarFile));
- }
- typeArg = TypeResolver.resolveRawArgument(Sink.class, sink.getClass());
-
- return typeArg;
- }
-
- protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkConfigProto2(SinkConfig sinkConfig)
- throws IOException {
- org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
- = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
- Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder);
- return functionDetailsBuilder.build();
- }
-
- protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) {
-
- File file = new File(jarFile);
- try {
- Reflections.loadJar(file);
- } catch (MalformedURLException e) {
- throw new RuntimeException("Failed to load user jar " + file, e);
- }
- Class<?> typeArg = getSinkType(file);
-
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
- if (sinkConfig.getTenant() != null) {
- functionDetailsBuilder.setTenant(sinkConfig.getTenant());
- }
- if (sinkConfig.getNamespace() != null) {
- functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
- }
- if (sinkConfig.getName() != null) {
- functionDetailsBuilder.setName(sinkConfig.getName());
- }
- functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
- functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
- functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
- if (sinkConfig.getProcessingGuarantees() != null) {
- functionDetailsBuilder.setProcessingGuarantees(
- convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
- }
-
- // set source spec
- SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
- sourceSpecBuilder.setClassName(PulsarSource.class.getName());
- sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
- sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
- sourceSpecBuilder.setTypeClassName(typeArg.getName());
- functionDetailsBuilder.setSource(sourceSpecBuilder);
-
- // set up sink spec
- SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
- sinkSpecBuilder.setClassName(sinkConfig.getClassName());
- sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
- sinkSpecBuilder.setTypeClassName(typeArg.getName());
- functionDetailsBuilder.setSink(sinkSpecBuilder);
- return functionDetailsBuilder.build();
- }
- }
-
- @Parameters(commandDescription = "Stops a Pulsar sink or source")
- class DeleteConnector extends BaseCommand {
+ @Parameters(commandDescription = "Stops a Pulsar source")
+ class DeleteSource extends BaseCommand {
@Parameter(names = "--tenant", description = "The tenant of a sink or source")
protected String tenant;
@@ -491,21 +287,17 @@ public class CmdConnectors extends CmdBase {
super.processArguments();
if (null == tenant || null == namespace || null == name) {
throw new RuntimeException(
- "You must specify a tenant, namespace, and name for the sink or source");
+ "You must specify a tenant, namespace, and name for the source");
}
}
@Override
void runCmd() throws Exception {
admin.functions().deleteFunction(tenant, namespace, name);
- print("Deleted successfully");
+ print("Delete source successfully");
}
}
- private static SinkConfig loadSinkConfig(String file) throws IOException {
- return (SinkConfig) loadConfig(file, SinkConfig.class);
- }
-
private static SourceConfig loadSourceConfig(String file) throws IOException {
return (SourceConfig) loadConfig(file, SourceConfig.class);
}
@@ -525,21 +317,6 @@ public class CmdConnectors extends CmdBase {
&& sourceConfig.getParallelism() > 0;
}
- public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) {
- return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty()
- && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty()
- && sinkConfig.getName() != null && !sinkConfig.getName().isEmpty()
- && sinkConfig.getClassName() != null && !sinkConfig.getClassName().isEmpty()
- && sinkConfig.getTopicToSerdeClassName() != null && !sinkConfig.getTopicToSerdeClassName().isEmpty()
- && sinkConfig.getParallelism() > 0;
- }
-
- private static void validateTopicName(String topic) {
- if (!TopicName.isValid(topic)) {
- throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic));
- }
- }
-
private static ProcessingGuarantees convertProcessingGuarantee(
FunctionConfig.ProcessingGuarantees processingGuarantees) {
for (ProcessingGuarantees type : ProcessingGuarantees.values()) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c0ee2cb..6467072 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -96,7 +96,8 @@ public class PulsarAdminTool {
commandMap.put("resource-quotas", CmdResourceQuotas.class);
commandMap.put("functions", CmdFunctions.class);
- commandMap.put("connectors", CmdConnectors.class);
+ commandMap.put("source", CmdSources.class);
+ commandMap.put("sink", CmdSinks.class);
}
private void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {
--
To stop receiving notification emails like this one, please contact
sijie@apache.org.