You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/05/08 09:05:35 UTC

[GitHub] sijie closed pull request #1745: Seperate cmd line connector interface to explicit source and sink

sijie closed pull request #1745: Seperate cmd line connector interface to explicit source and sink
URL: https://github.com/apache/incubator-pulsar/pull/1745
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
similarity index 57%
rename from pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
rename to pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 0492d76f1f..01ec16ce67 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdConnectors.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -27,6 +27,7 @@
 import com.google.gson.reflect.TypeToken;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.internal.FunctionsImpl;
 import org.apache.pulsar.common.naming.TopicName;
@@ -35,16 +36,12 @@
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.shaded.proto.Function;
 import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
 import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
-import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.SinkConfig;
-import org.apache.pulsar.functions.utils.SourceConfig;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.source.PulsarSource;
-import org.apache.pulsar.functions.utils.Reflections;
-import org.apache.pulsar.functions.utils.Utils;
+import org.apache.pulsar.functions.utils.*;
 
 import java.io.File;
 import java.io.IOException;
@@ -56,32 +53,24 @@
 import java.util.Map;
 import java.util.function.Consumer;
 
-import net.jodah.typetools.TypeResolver;
-
 @Slf4j
 @Getter
-@Parameters(commandDescription = "Interface for managing Pulsar Connectors (Ingress and egress data to and from Pulsar)")
-public class CmdConnectors extends CmdBase {
+@Parameters(commandDescription = "Interface for managing Pulsar Sinks (Egress data from Pulsar)")
+public class CmdSinks extends CmdBase {
 
-    private final CreateSource createSource;
     private final CreateSink createSink;
-    private final DeleteConnector deleteConnector;
-    private final LocalSourceRunner localSourceRunner;
+    private final DeleteSink deleteSink;
     private final LocalSinkRunner localSinkRunner;
 
-    public CmdConnectors(PulsarAdmin admin) {
-        super("connectors", admin);
-        createSource = new CreateSource();
+    public CmdSinks(PulsarAdmin admin) {
+        super("sink", admin);
         createSink = new CreateSink();
-        deleteConnector = new DeleteConnector();
-        localSourceRunner = new LocalSourceRunner();
+        deleteSink = new DeleteSink();
         localSinkRunner = new LocalSinkRunner();
 
-        jcommander.addCommand("create-source", createSource);
-        jcommander.addCommand("create-sink", createSink);
-        jcommander.addCommand("delete", deleteConnector);
-        jcommander.addCommand("localrun-source", localSourceRunner);
-        jcommander.addCommand("localrun-sink", localSinkRunner);
+        jcommander.addCommand("create", createSink);
+        jcommander.addCommand("delete", deleteSink);
+        jcommander.addCommand("localrun", localSinkRunner);
     }
 
     /**
@@ -101,20 +90,7 @@ void processArguments() throws Exception {
         abstract void runCmd() throws Exception;
     }
 
-    @Parameters(commandDescription = "Run the Pulsar source or sink locally (rather than deploying it to the Pulsar cluster)")
-
-    class LocalSourceRunner extends CreateSource {
-
-        @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
-        protected String brokerServiceUrl;
-
-        @Override
-        void runCmd() throws Exception {
-            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
-                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
-        }
-    }
-
+    @Parameters(commandDescription = "Run the Pulsar sink locally (rather than deploying it to the Pulsar cluster)")
     class LocalSinkRunner extends CreateSink {
 
         @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
@@ -127,173 +103,6 @@ void runCmd() throws Exception {
         }
     }
 
-    @Parameters(commandDescription = "Create Pulsar source connectors")
-    class CreateSource extends BaseCommand {
-        @Parameter(names = "--tenant", description = "The source's tenant")
-        protected String tenant;
-        @Parameter(names = "--namespace", description = "The source's namespace")
-        protected String namespace;
-        @Parameter(names = "--name", description = "The source's name")
-        protected String name;
-        @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source")
-        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
-        @Parameter(names = "--className", description = "The source's class name")
-        protected String className;
-        @Parameter(names = "--destinationTopicName", description = "Pulsar topic to ingress data to")
-        protected String destinationTopicName;
-        @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
-        protected String deserializationClassName;
-        @Parameter(names = "--parallelism", description = "Number of instances of the source")
-        protected String parallelism;
-        @Parameter(
-                names = "--jar",
-                description = "Path to the jar file for the Source",
-                listConverter = StringConverter.class)
-        protected String jarFile;
-
-        @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the "
-                + "source's configuration")
-        protected String sourceConfigFile;
-
-        protected SourceConfig sourceConfig;
-
-        @Override
-        void processArguments() throws Exception {
-            super.processArguments();
-
-            if (null != sourceConfigFile) {
-                this.sourceConfig = loadSourceConfig(sourceConfigFile);
-            } else {
-                this.sourceConfig = new SourceConfig();
-            }
-
-            if (null != tenant) {
-                sourceConfig.setTenant(tenant);
-            }
-            if (null != namespace) {
-                sourceConfig.setNamespace(namespace);
-            }
-            if (null != name) {
-                sourceConfig.setName(name);
-            }
-
-            if (null != className) {
-                this.sourceConfig.setClassName(className);
-            }
-            if (null != destinationTopicName) {
-                sourceConfig.setTopicName(destinationTopicName);
-            }
-            if (null != deserializationClassName) {
-                sourceConfig.setSerdeClassName(deserializationClassName);
-            }
-            if (null != processingGuarantees) {
-                sourceConfig.setProcessingGuarantees(processingGuarantees);
-            }
-            if (parallelism == null) {
-                if (sourceConfig.getParallelism() == 0) {
-                    sourceConfig.setParallelism(1);
-                }
-            } else {
-                int num = Integer.parseInt(parallelism);
-                if (num <= 0) {
-                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
-                            + "connector must be positive");
-                }
-                sourceConfig.setParallelism(num);
-            }
-
-            if (null == jarFile) {
-                throw new IllegalArgumentException("Connector JAR not specfied");
-            }
-        }
-
-        @Override
-        void runCmd() throws Exception {
-            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
-                throw new RuntimeException("Missing arguments");
-            }
-            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
-            print("Created successfully");
-        }
-
-        private Class<?> getSourceType(File file) {
-            if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
-                throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            } else if (!Reflections.classInJarImplementsIface(file, sourceConfig.getClassName(), Source.class)) {
-                throw new IllegalArgumentException(String.format("The Pulsar source class %s in jar %s implements does not implement " + Source.class.getName(),
-                        sourceConfig.getClassName(), jarFile));
-            }
-
-            Object userClass = Reflections.createInstance(sourceConfig.getClassName(), file);
-            Class<?> typeArg;
-            Source source = (Source) userClass;
-            if (source == null) {
-                throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated from jar %s",
-                        sourceConfig.getClassName(), jarFile));
-            }
-            typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass());
-
-            return typeArg;
-        }
-
-        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig)
-                throws IOException {
-            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
-                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
-            Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder);
-            return functionDetailsBuilder.build();
-        }
-
-        protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) {
-
-            File file = new File(jarFile);
-            try {
-                Reflections.loadJar(file);
-            } catch (MalformedURLException e) {
-                throw new RuntimeException("Failed to load user jar " + file, e);
-            }
-            Class<?> typeArg = getSourceType(file);
-
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            if (sourceConfig.getTenant() != null) {
-                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
-            }
-            if (sourceConfig.getNamespace() != null) {
-                functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
-            }
-            if (sourceConfig.getName() != null) {
-                functionDetailsBuilder.setName(sourceConfig.getName());
-            }
-            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
-            functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
-            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
-            if (sourceConfig.getProcessingGuarantees() != null) {
-                functionDetailsBuilder.setProcessingGuarantees(
-                        convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
-            }
-
-            // set source spec
-            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
-            sourceSpecBuilder.setClassName(sourceConfig.getClassName());
-            sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
-            sourceSpecBuilder.setTypeClassName(typeArg.getName());
-            functionDetailsBuilder.setSource(sourceSpecBuilder);
-
-            // set up sink spec
-            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
-            sinkSpecBuilder.setClassName(PulsarSink.class.getName());
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
-                sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
-            }
-            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
-            sinkSpecBuilder.setTypeClassName(typeArg.getName());
-
-            functionDetailsBuilder.setSink(sinkSpecBuilder);
-            return functionDetailsBuilder.build();
-        }
-    }
-
     @Parameters(commandDescription = "Create Pulsar sink connectors")
     class CreateSink extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
@@ -356,7 +165,7 @@ void processArguments() throws Exception {
                 inputTopics.forEach(new Consumer<String>() {
                     @Override
                     public void accept(String s) {
-                        CmdConnectors.validateTopicName(s);
+                        CmdSinks.validateTopicName(s);
                         topicsToSerDeClassName.put(s, "");
                     }
                 });
@@ -365,7 +174,7 @@ public void accept(String s) {
                 Type type = new TypeToken<Map<String, String>>(){}.getType();
                 Map<String, String> customSerdeInputMap = new Gson().fromJson(customSerdeInputString, type);
                 customSerdeInputMap.forEach((topic, serde) -> {
-                    CmdConnectors.validateTopicName(topic);
+                    CmdSinks.validateTopicName(topic);
                     topicsToSerDeClassName.put(topic, serde);
                 });
             }
@@ -475,15 +284,15 @@ protected FunctionDetails createSinkConfig(SinkConfig sinkConfig) {
     }
 
     @Parameters(commandDescription = "Stops a Pulsar sink or source")
-    class DeleteConnector extends BaseCommand {
+    class DeleteSink extends BaseCommand {
 
-        @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+        @Parameter(names = "--tenant", description = "The tenant of the sink")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+        @Parameter(names = "--namespace", description = "The namespace of the sink")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of a sink or source")
+        @Parameter(names = "--name", description = "The name of the sink")
         protected String name;
 
         @Override
@@ -491,7 +300,7 @@ void processArguments() throws Exception {
             super.processArguments();
             if (null == tenant || null == namespace || null == name) {
                 throw new RuntimeException(
-                        "You must specify a tenant, namespace, and name for the sink or source");
+                        "You must specify a tenant, namespace, and name for the sink");
             }
         }
 
@@ -506,25 +315,11 @@ private static SinkConfig loadSinkConfig(String file) throws IOException {
         return (SinkConfig) loadConfig(file, SinkConfig.class);
     }
 
-    private static SourceConfig loadSourceConfig(String file) throws IOException {
-        return (SourceConfig) loadConfig(file, SourceConfig.class);
-    }
-
     private static Object loadConfig(String file, Class<?> clazz) throws IOException {
         ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
         return mapper.readValue(new File(file), clazz);
     }
 
-    public static boolean areAllRequiredFieldsPresentForSource(SourceConfig sourceConfig) {
-        return sourceConfig.getTenant() != null && !sourceConfig.getTenant().isEmpty()
-                && sourceConfig.getNamespace() != null && !sourceConfig.getNamespace().isEmpty()
-                && sourceConfig.getName() != null && !sourceConfig.getName().isEmpty()
-                && sourceConfig.getClassName() != null && !sourceConfig.getClassName().isEmpty()
-                && sourceConfig.getTopicName() != null && !sourceConfig.getTopicName().isEmpty()
-                || sourceConfig.getSerdeClassName() != null
-                && sourceConfig.getParallelism() > 0;
-    }
-
     public static boolean areAllRequiredFieldsPresentForSink(SinkConfig sinkConfig) {
         return sinkConfig.getTenant() != null && !sinkConfig.getTenant().isEmpty()
                 && sinkConfig.getNamespace() != null && !sinkConfig.getNamespace().isEmpty()
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
new file mode 100644
index 0000000000..c651e373b1
--- /dev/null
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -0,0 +1,329 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.admin.cli;
+
+import com.beust.jcommander.Parameter;
+import com.beust.jcommander.Parameters;
+import com.beust.jcommander.converters.StringConverter;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import lombok.Getter;
+import lombok.extern.slf4j.Slf4j;
+import net.jodah.typetools.TypeResolver;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.admin.internal.FunctionsImpl;
+import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.connect.core.Sink;
+import org.apache.pulsar.connect.core.Source;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.shaded.proto.Function;
+import org.apache.pulsar.functions.shaded.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.shaded.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.shaded.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.shaded.proto.Function.SourceSpec;
+import org.apache.pulsar.functions.sink.PulsarSink;
+import org.apache.pulsar.functions.source.PulsarSource;
+import org.apache.pulsar.functions.utils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Type;
+import java.net.MalformedURLException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+
+@Slf4j
+@Getter
+@Parameters(commandDescription = "Interface for managing Pulsar Source (Ingress data to Pulsar)")
+public class CmdSources extends CmdBase {
+
+    private final CreateSource createSource;
+    private final DeleteSource deleteSource;
+    private final LocalSourceRunner localSourceRunner;
+
+    public CmdSources(PulsarAdmin admin) {
+        super("source", admin);
+        createSource = new CreateSource();
+        deleteSource = new DeleteSource();
+        localSourceRunner = new LocalSourceRunner();
+
+        jcommander.addCommand("create", createSource);
+        jcommander.addCommand("delete", deleteSource);
+        jcommander.addCommand("localrun", localSourceRunner);
+    }
+
+    /**
+     * Base command
+     */
+    @Getter
+    abstract class BaseCommand extends CliCommand {
+        @Override
+        void run() throws Exception {
+            processArguments();
+            runCmd();
+        }
+
+        void processArguments() throws Exception {
+        }
+
+        abstract void runCmd() throws Exception;
+    }
+
+    @Parameters(commandDescription = "Run the Pulsar source locally (rather than deploying it to the Pulsar cluster)")
+    class LocalSourceRunner extends CreateSource {
+
+        @Parameter(names = "--brokerServiceUrl", description = "The URL for the Pulsar broker")
+        protected String brokerServiceUrl;
+
+        @Override
+        void runCmd() throws Exception {
+            CmdFunctions.startLocalRun(createSourceConfigProto2(sourceConfig),
+                    sourceConfig.getParallelism(), brokerServiceUrl, jarFile, admin);
+        }
+    }
+
+    @Parameters(commandDescription = "Create Pulsar source connectors")
+    class CreateSource extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The source's tenant")
+        protected String tenant;
+        @Parameter(names = "--namespace", description = "The source's namespace")
+        protected String namespace;
+        @Parameter(names = "--name", description = "The source's name")
+        protected String name;
+        @Parameter(names = "--processingGuarantees", description = "The processing guarantees (aka delivery semantics) applied to the Source")
+        protected FunctionConfig.ProcessingGuarantees processingGuarantees;
+        @Parameter(names = "--className", description = "The source's class name")
+        protected String className;
+        @Parameter(names = "--destinationTopicName", description = "Pulsar topic to ingress data to")
+        protected String destinationTopicName;
+        @Parameter(names = "--deserializationClassName", description = "The classname for SerDe class for the source")
+        protected String deserializationClassName;
+        @Parameter(names = "--parallelism", description = "Number of instances of the source")
+        protected String parallelism;
+        @Parameter(
+                names = "--jar",
+                description = "Path to the jar file for the Source",
+                listConverter = StringConverter.class)
+        protected String jarFile;
+
+        @Parameter(names = "--sourceConfigFile", description = "The path to a YAML config file specifying the "
+                + "source's configuration")
+        protected String sourceConfigFile;
+
+        protected SourceConfig sourceConfig;
+
+        @Override
+        void processArguments() throws Exception {
+            super.processArguments();
+
+            if (null != sourceConfigFile) {
+                this.sourceConfig = loadSourceConfig(sourceConfigFile);
+            } else {
+                this.sourceConfig = new SourceConfig();
+            }
+
+            if (null != tenant) {
+                sourceConfig.setTenant(tenant);
+            }
+            if (null != namespace) {
+                sourceConfig.setNamespace(namespace);
+            }
+            if (null != name) {
+                sourceConfig.setName(name);
+            }
+
+            if (null != className) {
+                this.sourceConfig.setClassName(className);
+            }
+            if (null != destinationTopicName) {
+                sourceConfig.setTopicName(destinationTopicName);
+            }
+            if (null != deserializationClassName) {
+                sourceConfig.setSerdeClassName(deserializationClassName);
+            }
+            if (null != processingGuarantees) {
+                sourceConfig.setProcessingGuarantees(processingGuarantees);
+            }
+            if (parallelism == null) {
+                if (sourceConfig.getParallelism() == 0) {
+                    sourceConfig.setParallelism(1);
+                }
+            } else {
+                int num = Integer.parseInt(parallelism);
+                if (num <= 0) {
+                    throw new IllegalArgumentException("The parallelism factor (the number of instances) for the "
+                            + "connector must be positive");
+                }
+                sourceConfig.setParallelism(num);
+            }
+
+            if (null == jarFile) {
+                throw new IllegalArgumentException("Connector JAR not specfied");
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            if (!areAllRequiredFieldsPresentForSource(sourceConfig)) {
+                throw new RuntimeException("Missing arguments");
+            }
+            admin.functions().createFunction(createSourceConfig(sourceConfig), jarFile);
+            print("Created successfully");
+        }
+
+        private Class<?> getSourceType(File file) {
+            if (!Reflections.classExistsInJar(file, sourceConfig.getClassName())) {
+                throw new IllegalArgumentException(String.format("Pulsar Source class %s does not exist in jar %s",
+                        sourceConfig.getClassName(), jarFile));
+            } else if (!Reflections.classInJarImplementsIface(file, sourceConfig.getClassName(), Source.class)) {
+                throw new IllegalArgumentException(String.format("The Pulsar source class %s in jar %s implements does not implement " + Source.class.getName(),
+                        sourceConfig.getClassName(), jarFile));
+            }
+
+            Object userClass = Reflections.createInstance(sourceConfig.getClassName(), file);
+            Class<?> typeArg;
+            Source source = (Source) userClass;
+            if (source == null) {
+                throw new IllegalArgumentException(String.format("The Pulsar source class %s could not be instantiated from jar %s",
+                        sourceConfig.getClassName(), jarFile));
+            }
+            typeArg = TypeResolver.resolveRawArgument(Source.class, source.getClass());
+
+            return typeArg;
+        }
+
+        protected org.apache.pulsar.functions.proto.Function.FunctionDetails createSourceConfigProto2(SourceConfig sourceConfig)
+                throws IOException {
+            org.apache.pulsar.functions.proto.Function.FunctionDetails.Builder functionDetailsBuilder
+                    = org.apache.pulsar.functions.proto.Function.FunctionDetails.newBuilder();
+            Utils.mergeJson(FunctionsImpl.printJson(createSourceConfig(sourceConfig)), functionDetailsBuilder);
+            return functionDetailsBuilder.build();
+        }
+
+        protected FunctionDetails createSourceConfig(SourceConfig sourceConfig) {
+
+            File file = new File(jarFile);
+            try {
+                Reflections.loadJar(file);
+            } catch (MalformedURLException e) {
+                throw new RuntimeException("Failed to load user jar " + file, e);
+            }
+            Class<?> typeArg = getSourceType(file);
+
+            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+            if (sourceConfig.getTenant() != null) {
+                functionDetailsBuilder.setTenant(sourceConfig.getTenant());
+            }
+            if (sourceConfig.getNamespace() != null) {
+                functionDetailsBuilder.setNamespace(sourceConfig.getNamespace());
+            }
+            if (sourceConfig.getName() != null) {
+                functionDetailsBuilder.setName(sourceConfig.getName());
+            }
+            functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+            functionDetailsBuilder.setParallelism(sourceConfig.getParallelism());
+            functionDetailsBuilder.setClassName(IdentityFunction.class.getName());
+            if (sourceConfig.getProcessingGuarantees() != null) {
+                functionDetailsBuilder.setProcessingGuarantees(
+                        convertProcessingGuarantee(sourceConfig.getProcessingGuarantees()));
+            }
+
+            // set source spec
+            SourceSpec.Builder sourceSpecBuilder = SourceSpec.newBuilder();
+            sourceSpecBuilder.setClassName(sourceConfig.getClassName());
+            sourceSpecBuilder.setConfigs(new Gson().toJson(sourceConfig.getConfigs()));
+            sourceSpecBuilder.setTypeClassName(typeArg.getName());
+            functionDetailsBuilder.setSource(sourceSpecBuilder);
+
+            // set up sink spec
+            SinkSpec.Builder sinkSpecBuilder = SinkSpec.newBuilder();
+            sinkSpecBuilder.setClassName(PulsarSink.class.getName());
+            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
+                sinkSpecBuilder.setSerDeClassName(sourceConfig.getSerdeClassName());
+            }
+            sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
+            sinkSpecBuilder.setTypeClassName(typeArg.getName());
+
+            functionDetailsBuilder.setSink(sinkSpecBuilder);
+            return functionDetailsBuilder.build();
+        }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar source")
+    class DeleteSource extends BaseCommand {
+
+        @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+        protected String namespace;
+
+        @Parameter(names = "--name", description = "The name of a sink or source")
+        protected String name;
+
+        @Override
+        void processArguments() throws Exception {
+            super.processArguments();
+            if (null == tenant || null == namespace || null == name) {
+                throw new RuntimeException(
+                        "You must specify a tenant, namespace, and name for the source");
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            admin.functions().deleteFunction(tenant, namespace, name);
+            print("Delete source successfully");
+        }
+    }
+
+    private static SourceConfig loadSourceConfig(String file) throws IOException {
+        return (SourceConfig) loadConfig(file, SourceConfig.class);
+    }
+
+    private static Object loadConfig(String file, Class<?> clazz) throws IOException {
+        ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
+        return mapper.readValue(new File(file), clazz);
+    }
+
+    public static boolean areAllRequiredFieldsPresentForSource(SourceConfig sourceConfig) {
+        return sourceConfig.getTenant() != null && !sourceConfig.getTenant().isEmpty()
+                && sourceConfig.getNamespace() != null && !sourceConfig.getNamespace().isEmpty()
+                && sourceConfig.getName() != null && !sourceConfig.getName().isEmpty()
+                && sourceConfig.getClassName() != null && !sourceConfig.getClassName().isEmpty()
+                && sourceConfig.getTopicName() != null && !sourceConfig.getTopicName().isEmpty()
+                || sourceConfig.getSerdeClassName() != null
+                && sourceConfig.getParallelism() > 0;
+    }
+
+    private static ProcessingGuarantees convertProcessingGuarantee(
+            FunctionConfig.ProcessingGuarantees processingGuarantees) {
+        for (ProcessingGuarantees type : ProcessingGuarantees.values()) {
+            if (type.name().equals(processingGuarantees.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
+    }
+}
\ No newline at end of file
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
index c0ee2cb1b8..6467072c28 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/PulsarAdminTool.java
@@ -96,7 +96,8 @@
 
         commandMap.put("resource-quotas", CmdResourceQuotas.class);
         commandMap.put("functions", CmdFunctions.class);
-        commandMap.put("connectors", CmdConnectors.class);
+        commandMap.put("source", CmdSources.class);
+        commandMap.put("sink", CmdSinks.class);
     }
 
     private void setupCommands(Function<PulsarAdminBuilder, ? extends PulsarAdmin> adminFactory) {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services