You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2018/05/08 09:05:35 UTC

[incubator-pulsar] branch master updated: Seperate cmd line connector interface to explicit source and sink (#1745)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 17a7d91  Seperate cmd line connector interface to explicit source and sink (#1745)
17a7d91 is described below

commit 17a7d91ab5b73cfbce1ae775dfc6312e88896287
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue May 8 02:05:33 2018 -0700

    Seperate cmd line connector interface to explicit source and sink (#1745)
---
 .../cli/{CmdConnectors.java => CmdSinks.java}      | 245 ++------------------
 .../cli/{CmdConnectors.java => CmdSources.java}    | 257 ++-------------------
 .../apache/pulsar/admin/cli/PulsarAdminTool.java   |   3 +-
 3 files changed, 39 insertions(+), 466 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
similarity index 57%
copy from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
copy to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0492d76..01ec16c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,32 +53,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
 @Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress data from Pulsar)")
+public class CmdSinks extends CmdBase {
 
-    private final CreateSource createSource;
     private final CreateSink createSink;
-    private final DeleteConnector deleteConnector;
-    private final LocalSourceRunner localSourceRunner;
+    private final DeleteSink deleteSink;
     private final LocalSinkRunner localSinkRunner;
 
-    public CmdConnectors(PulsarAdmin admin) {
-        super("connectors", admin);
-        createSource = new CreateSource();
+    public CmdSinks(PulsarAdmin admin) {
+        super("sink", admin);
         createSink = new CreateSink();
-        deleteConnector = new DeleteConnector();
-        localSourceRunner = new LocalSourceRunner();
+        deleteSink = new DeleteSink();
         localSinkRunner = new LocalSinkRunner();
 
-        jcommander.addCommand("create-source", createSource);
-        jcommander.addCommand("create-sink", createSink);
-        jcommander.addCommand("delete", deleteConnector);
-        jcommander.addCommand("localrun-source", localSourceRunner);
-        jcommander.addCommand("localrun-sink", localSinkRunner);
+        jcommander.addCommand("create", createSink);
+        jcommander.addCommand("delete", deleteSink);
+        jcommander.addCommand("localrun", localSinkRunner);
     }
 
     /**
@@ -101,20 +90,7 @@ public class CmdConnectors extends CmdBase {
         abstract void runCmd() throws Exception;
     }
 
-    @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)")
-
-    class LocalSourceRunner extends CreateSource {
-
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
-        protected String brokerServiceUrl;
-
-        @Override
-        void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
-                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
-        }
-    }
-
+    @Parameters(commandDescription = "Run the Pulsar sink locally (rather than deploying it to the Pulsar cluster)")
     class LocalSinkRunner extends CreateSink {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@@ -127,173 +103,6 @@ public class CmdConnectors extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Create Pulsar source connectors")
-    class CreateSource extends BaseCommand {
-        @Parameter(names = "--tenant", description = "The source's tenant")
-        protected String tenant;
-        @Parameter(names = "--namespace", description = "The source's namespace")
-        protected String namespace;
-        @Parameter(names = "--name", description = "The source's name")
-        protected String name;
-        @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source")
-        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--className", description = "The source's class name")
-        protected String className;
-        @Parameter(names = "--destinationTopicName", description = "Pulsar topic to ingress data to")
-        protected String destinationTopicName;
-        @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
-        protected String deserializationClassName;
-        @Parameter(names = "--parallelism", description = "Number of instances of the source")
-        protected String parallelism;
-        @Parameter(
-                names = "--jar",
-                description = "Path to the jar file for the Source",
-                listConverter = StringConverter.class)
-        protected String jarFile;
-
-        @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the "
-                + "source's configuration")
-        protected String sourceConfigFile;
-
-        protected SourceConfig sourceConfig;
-
-        @Override
-        void processArguments() throws Exception {
-            super.processArguments();
-
-            if (null != sourceConfigFile) {
-                this.sourceConfig = loadSourceConfig(sourceConfigFile);
-            } else {
-                this.sourceConfig = new SourceConfig();
-            }
-
-            if (null != tenant) {
-                sourceConfig.setTenant(tenant);
-            }
-            if (null != namespace) {
-                sourceConfig.setNamespace(namespace);
-            }
-            if (null != name) {
-                sourceConfig.setName(name);
-            }
-
-            if (null != className) {
-                this.sourceConfig.setClassName(className);
-            }
-            if (null != destinationTopicName) {
-                sourceConfig.setTopicName(destinationTopicName);
-            }
-            if (null != deserializationClassName) {
-                sourceConfig.setSerdeClassName(deserializationClassName);
-            }
-            if (null != processingGuarantees) {
-                sourceConfig.setProcessingGuarantees(processingGuarantees);
-            }
-            if (parallelism == null) {
-                if (sourceConfig.getParallelism() == 0) {
-                    sourceConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
-                            + "connector must be positive");
-                }
-                sourceConfig.setParallelism(num);
-            }
-
-            if (null == jarFile) {
-                throw new IllegalArgumentException("Connector JAR not specfied");
-            }
-        }
-
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
-            print("Created successfully");
-        }
-
-        private Class<?> getSourceType(File file) {
-            if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, sourceConfig.getClassName(), Source.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar source class %s in jar %s implements does not implement " + Source.class.getName(),
-                        sourceConfig.getClassName(), jarFile));
-            }
-
-            Object userClass = Reflections.createInstance(sourceConfig.getClassName(), file);
-            Class<?> typeArg;
-            Source source = (Source) userClass;
-            if (source == null) {
-                throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated from jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            }
-            typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass());
-
-            return typeArg;
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
-                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) {
-
-            File file = new File(jarFile);
-            try {
-                Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, e);
-            }
-            Class<?> typeArg = getSourceType(file);
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            if (sourceConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
-            }
-            if (sourceConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
-            }
-            if (sourceConfig.getName() != null) {
-                functionDetailsBuilder.setName(sourceConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
-            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            if (sourceConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(sourceConfig.getClassName());
-            sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
-            sourceSpecBuilder.setTypeClassName(typeArg.getName());
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
-                sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
-            }
-            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
-            sinkSpecBuilder.setTypeClassName(typeArg.getName());
-
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-            return functionDetailsBuilder.build();
-        }
-    }
-
     @Parameters(commandDescription = "Create Pulsar sink connectors")
     class CreateSink extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
@@ -356,7 +165,7 @@ public class CmdConnectors extends CmdBase {
                 inputTopics.forEach(new Consumer<String>() {
                     @Override
                     public void accept(String s) {
-                        CmdConnectors.validateTopicName(s);
+                        CmdSinks.validateTopicName(s);
                         topicsToSerDeClassName.put(s, "");
                     }
                 });
@@ -365,7 +174,7 @@ public class CmdConnectors extends CmdBase {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
                 customSerdeInputMap.forEach((topic, serde) -> {
-                    CmdConnectors.validateTopicName(topic);
+                    CmdSinks.validateTopicName(topic);
                     topicsToSerDeClassName.put(topic, serde);
                 });
             }
@@ -475,15 +284,15 @@ public class CmdConnectors extends CmdBase {
     }
 
     @Parameters(commandDescription = "Stops a Pulsar sink or source")
-    class DeleteConnector extends BaseCommand {
+    class DeleteSink extends BaseCommand {
 
-        @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+        @Parameter(names = "--tenant", description = "The tenant of the sink")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+        @Parameter(names = "--namespace", description = "The namespace of the sink")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of a sink or source")
+        @Parameter(names = "--name", description = "The name of the sink")
         protected String name;
 
         @Override
@@ -491,7 +300,7 @@ public class CmdConnectors extends CmdBase {
             super.processArguments();
             if (null == tenant || null == namespace || null == name) {
                 throw new RuntimeException(
-                        "You must specify a tenant, namespace, and name for the sink or source");
+                        "You must specify a tenant, namespace, and name for the sink");
             }
         }
 
@@ -506,25 +315,11 @@ public class CmdConnectors extends CmdBase {
         return (SinkConfig) loadConfig(file, SinkConfig.class);
     }
 
-    private static SourceConfig loadSourceConfig(String file) throws IOException {
-        return (SourceConfig) loadConfig(file, SourceConfig.class);
-    }
-
     private static Object loadConfig(String file, Class<?> clazz) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(file), clazz);
     }
 
-    public static boolean areAllRequiredFieldsPresentForSource(SourceConfig sourceConfig) {
-        return sourceConfig.getTenant() != null && !sourceConfig.getTenant().isEmpty()
-                && sourceConfig.getNamespace() != null && !sourceConfig.getNamespace().isEmpty()
-                && sourceConfig.getName() != null && !sourceConfig.getName().isEmpty()
-                && sourceConfig.getClassName() != null && !sourceConfig.getClassName().isEmpty()
-                && sourceConfig.getTopicName() != null && !sourceConfig.getTopicName().isEmpty()
-                || sourceConfig.getSerdeClassName() != null
-                && sourceConfig.getParallelism() > 0;
-    }
-
     public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) {
         return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty()
                 && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty()
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
similarity index 54%
rename from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
rename to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index 0492d76..c651e37 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -27,6 +27,7 @@ import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@ import org.apache.pulsar.connect.core.Source;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,32 +53,24 @@ import java.util.List;
 import java.util.Map;
 import java.util.function.Consumer;
 
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
 @Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Source (Ingress data to Pulsar)")
+public class CmdSources extends CmdBase {
 
     private final CreateSource createSource;
-    private final CreateSink createSink;
-    private final DeleteConnector deleteConnector;
+    private final DeleteSource deleteSource;
     private final LocalSourceRunner localSourceRunner;
-    private final LocalSinkRunner localSinkRunner;
 
-    public CmdConnectors(PulsarAdmin admin) {
-        super("connectors", admin);
+    public CmdSources(PulsarAdmin admin) {
+        super("source", admin);
         createSource = new CreateSource();
-        createSink = new CreateSink();
-        deleteConnector = new DeleteConnector();
+        deleteSource = new DeleteSource();
         localSourceRunner = new LocalSourceRunner();
-        localSinkRunner = new LocalSinkRunner();
 
-        jcommander.addCommand("create-source", createSource);
-        jcommander.addCommand("create-sink", createSink);
-        jcommander.addCommand("delete", deleteConnector);
-        jcommander.addCommand("localrun-source", localSourceRunner);
-        jcommander.addCommand("localrun-sink", localSinkRunner);
+        jcommander.addCommand("create", createSource);
+        jcommander.addCommand("delete", deleteSource);
+        jcommander.addCommand("localrun", localSourceRunner);
     }
 
     /**
@@ -101,8 +90,7 @@ public class CmdConnectors extends CmdBase {
         abstract void runCmd() throws Exception;
     }
 
-    @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)")
-
+    @Parameters(commandDescription = "Run the Pulsar source locally (rather than deploying it to the Pulsar cluster)")
     class LocalSourceRunner extends CreateSource {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@@ -115,18 +103,6 @@ public class CmdConnectors extends CmdBase {
         }
     }
 
-    class LocalSinkRunner extends CreateSink {
-
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
-        protected String brokerServiceUrl;
-
-        @Override
-        void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSinkConfigProto2(sinkConfig),
-                    sinkConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
-        }
-    }
-
     @Parameters(commandDescription = "Create Pulsar source connectors")
     class CreateSource extends BaseCommand {
         @Parameter(names = "--tenant", description = "The source's tenant")
@@ -294,188 +270,8 @@ public class CmdConnectors extends CmdBase {
         }
     }
 
-    @Parameters(commandDescription = "Create Pulsar sink connectors")
-    class CreateSink extends BaseCommand {
-        @Parameter(names = "--tenant", description = "The sink's tenant")
-        protected String tenant;
-        @Parameter(names = "--namespace", description = "The sink's namespace")
-        protected String namespace;
-        @Parameter(names = "--name", description = "The sink's name")
-        protected String name;
-        @Parameter(names = "--className", description = "The sink's class name")
-        protected String className;
-        @Parameter(names = "--inputs", description = "The sink's input topic or topics (multiple topics can be specified as a comma-separated list)")
-        protected String inputs;
-        @Parameter(names = "--customSerdeInputs", description = "The map of input topics to SerDe class names (as a JSON string)")
-        protected String customSerdeInputString;
-        @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Sink")
-        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--parallelism", description = "")
-        protected String parallelism;
-        @Parameter(
-                names = "--jar",
-                description = "Path to the jar file for the sink",
-                listConverter = StringConverter.class)
-        protected String jarFile;
-
-        @Parameter(names = "--sinkConfigFile", description = "The path to a YAML config file specifying the "
-                + "sink's configuration")
-        protected String sinkConfigFile;
-
-        protected SinkConfig sinkConfig;
-
-        @Override
-        void processArguments() throws Exception {
-            super.processArguments();
-
-            if (null != sinkConfigFile) {
-                this.sinkConfig = loadSinkConfig(sinkConfigFile);
-            } else {
-                this.sinkConfig = new SinkConfig();
-            }
-
-            if (null != tenant) {
-                sinkConfig.setTenant(tenant);
-            }
-            if (null != namespace) {
-                sinkConfig.setNamespace(namespace);
-            }
-            if (null != name) {
-                sinkConfig.setName(name);
-            }
-
-            if (null != className) {
-                sinkConfig.setClassName(className);
-            }
-            if (null != processingGuarantees) {
-                sinkConfig.setProcessingGuarantees(processingGuarantees);
-            }
-            Map<String, String> topicsToSerDeClassName = new HashMap<>();
-            if (null != inputs) {
-                List<String> inputTopics = Arrays.asList(inputs.split(","));
-                inputTopics.forEach(new Consumer<String>() {
-                    @Override
-                    public void accept(String s) {
-                        CmdConnectors.validateTopicName(s);
-                        topicsToSerDeClassName.put(s, "");
-                    }
-                });
-            }
-            if (null != customSerdeInputString) {
-                Type type = new TypeToken<Map<String, String>>(){}.getType();
-                Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
-                customSerdeInputMap.forEach((topic, serde) -> {
-                    CmdConnectors.validateTopicName(topic);
-                    topicsToSerDeClassName.put(topic, serde);
-                });
-            }
-            sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-
-            if (parallelism == null) {
-                if (sinkConfig.getParallelism() == 0) {
-                    sinkConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
-                            + "connector must be positive");
-                }
-                sinkConfig.setParallelism(num);
-            }
-
-            if (null == jarFile) {
-                throw new IllegalArgumentException("Connector JAR not specfied");
-            }
-        }
-
-        @Override
-        void runCmd() throws Exception {
-            log.info("sinkConfig: {}", sinkConfig);
-            if (!areAllRequiredFieldsPresentForSink(sinkConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSinkConfig(sinkConfig), jarFile);
-            print("Created successfully");
-        }
-
-        private Class<?> getSinkType(File file) {
-            if (!Reflections.classExistsInJar(file, sinkConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar sink class %s does not exist in jar %s",
-                        sinkConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, sinkConfig.getClassName(), Sink.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar sink class %s in jar %s implements " + Sink.class.getName(),
-                        sinkConfig.getClassName(), jarFile));
-            }
-
-            Object userClass = Reflections.createInstance(sinkConfig.getClassName(), file);
-            Class<?> typeArg;
-            Sink sink = (Sink) userClass;
-            if (sink == null) {
-                throw new IllegalArgumentException(String.format("The Pulsar sink class %s could not be instantiated from jar %s",
-                        sinkConfig.getClassName(), jarFile));
-            }
-            typeArg = TypeResolver.resolveRawArgument(Sink.class, sink.getClass());
-
-            return typeArg;
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSinkConfigProto2(SinkConfig sinkConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
-                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(createSinkConfig(sinkConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) {
-
-            File file = new File(jarFile);
-            try {
-                Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, e);
-            }
-            Class<?> typeArg = getSinkType(file);
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            if (sinkConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sinkConfig.getTenant());
-            }
-            if (sinkConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(sinkConfig.getNamespace());
-            }
-            if (sinkConfig.getName() != null) {
-                functionDetailsBuilder.setName(sinkConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            functionDetailsBuilder.setParallelism(sinkConfig.getParallelism());
-            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            if (sinkConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        convertProcessingGuarantee(sinkConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(PulsarSource.class.getName());
-            sourceSpecBuilder.setSubscriptionType(Function.SubscriptionType.SHARED);
-            sourceSpecBuilder.putAllTopicsToSerDeClassName(sinkConfig.getTopicToSerdeClassName());
-            sourceSpecBuilder.setTypeClassName(typeArg.getName());
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(sinkConfig.getClassName());
-            sinkSpecBuilder.setConfigs(new Gson().toJson(sinkConfig.getConfigs()));
-            sinkSpecBuilder.setTypeClassName(typeArg.getName());
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-            return functionDetailsBuilder.build();
-        }
-    }
-
-    @Parameters(commandDescription = "Stops a Pulsar sink or source")
-    class DeleteConnector extends BaseCommand {
+    @Parameters(commandDescription = "Stops a Pulsar source")
+    class DeleteSource extends BaseCommand {
 
         @Parameter(names = "--tenant", description = "The tenant of a sink or source")
         protected String tenant;
@@ -491,21 +287,17 @@ public class CmdConnectors extends CmdBase {
             super.processArguments();
             if (null == tenant || null == namespace || null == name) {
                 throw new RuntimeException(
-                        "You must specify a tenant, namespace, and name for the sink or source");
+                        "You must specify a tenant, namespace, and name for the source");
             }
         }
 
         @Override
         void runCmd() throws Exception {
             admin.functions().deleteFunction(tenant, namespace, name);
-            print("Deleted successfully");
+            print("Delete source successfully");
         }
     }
 
-    private static SinkConfig loadSinkConfig(String file) throws IOException {
-        return (SinkConfig) loadConfig(file, SinkConfig.class);
-    }
-
     private static SourceConfig loadSourceConfig(String file) throws IOException {
         return (SourceConfig) loadConfig(file, SourceConfig.class);
     }
@@ -525,21 +317,6 @@ public class CmdConnectors extends CmdBase {
                 && sourceConfig.getParallelism() > 0;
     }
 
-    public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) {
-        return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty()
-                && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty()
-                && sinkConfig.getName() != null && !sinkConfig.getName().isEmpty()
-                && sinkConfig.getClassName() != null && !sinkConfig.getClassName().isEmpty()
-                && sinkConfig.getTopicToSerdeClassName() != null && !sinkConfig.getTopicToSerdeClassName().isEmpty()
-                && sinkConfig.getParallelism() > 0;
-    }
-
-    private static void validateTopicName(String topic) {
-        if (!TopicName.isValid(topic)) {
-            throw new IllegalArgumentException(String.format("The topic name %s is invalid", topic));
-        }
-    }
-
     private static ProcessingGuarantees convertProcessingGuarantee(
             FunctionConfig.ProcessingGuarantees processingGuarantees) {
         for (ProcessingGuarantees type : ProcessingGuarantees.values()) {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c0ee2cb..6467072 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -96,7 +96,8 @@ public class PulsarAdminTool {
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
-        commandMap.put("connectors", CmdConnectors.class);
+        commandMap.put("source", CmdSources.class);
+        commandMap.put("sink", CmdSinks.class);
     }
 
     private void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {

-- 
To stop receiving notification emails like this one, please contact
sijie@apache.org.