You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/19 17:00:28 UTC

[GitHub] srkukarni closed pull request #2810: Added stop/restart functionality in sources/sinks

srkukarni closed pull request #2810: Added stop/restart functionality in sources/sinks
URL: https://github.com/apache/pulsar/pull/2810
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index b50da21bfd..01dd354d19 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -162,7 +162,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -179,7 +179,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
     }
 
     @GET
@@ -256,7 +256,7 @@ public Response getFunctionState(final @PathParam("tenant") String tenant,
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -268,7 +268,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
@@ -281,7 +281,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -293,7 +293,7 @@ public Response stopFunction(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 0f5a5c5d3b..ba7dedc8ac 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -148,7 +148,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
                                           final @PathParam("sinkName") String sinkName,
                                           final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+            tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -164,7 +164,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
     public Response getSinkStatus(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
     }
 
     @GET
@@ -194,7 +194,7 @@ public Response listSinks(final @PathParam("tenant") String tenant,
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -206,7 +206,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+        return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @POST
@@ -219,7 +219,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -231,7 +231,7 @@ public Response stopSink(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+        return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 4bda48995c..c695a1ada4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -150,7 +150,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
                                             final @PathParam("sourceName") String sourceName,
                                             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -166,7 +166,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
     public Response getSourceStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sourceName") String sourceName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
     }
 
     @GET
@@ -197,7 +197,7 @@ public Response listSources(final @PathParam("tenant") String tenant,
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -209,7 +209,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+        return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @POST
@@ -222,7 +222,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -234,7 +234,7 @@ public Response stopSource(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+        return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 3b9159e98c..37d2d9bcdb 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -31,6 +31,7 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -67,6 +68,9 @@
     private final DeleteSink deleteSink;
     private final ListSinks listSinks;
     private final GetSink getSink;
+    private final GetSinkStatus getSinkStatus;
+    private final StopSink stopSink;
+    private final RestartSink restartSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
@@ -76,6 +80,9 @@ public CmdSinks(PulsarAdmin admin) {
         deleteSink = new DeleteSink();
         listSinks = new ListSinks();
         getSink = new GetSink();
+        getSinkStatus = new GetSinkStatus();
+        stopSink = new StopSink();
+        restartSink = new RestartSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
@@ -83,6 +90,9 @@ public CmdSinks(PulsarAdmin admin) {
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("list", listSinks);
         jcommander.addCommand("get", getSink);
+        jcommander.addCommand("getstatus", getSinkStatus);
+        jcommander.addCommand("stop", stopSink);
+        jcommander.addCommand("restart", restartSink);
         jcommander.addCommand("localrun", localSinkRunner);
         jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
@@ -590,6 +600,65 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Check the current status of a Pulsar Sink")
+    class GetSinkStatus extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (Get-status of all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = Utils.printJson(
+                    isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName)
+                            : admin.sink().getSinkStatus(tenant, namespace, sinkName,
+                            Integer.parseInt(instanceId)));
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(new JsonParser().parse(json)));
+        }
+    }
+
+    @Parameters(commandDescription = "Restart sink instance")
+    class RestartSink extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.sink().restartSink(tenant, namespace, sinkName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again")
+    class StopSink extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.sink().stopSink(tenant, namespace, sinkName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
     public class ListBuiltInSinks extends BaseCommand {
         @Override
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f27b0a5437..5a1e9b3817 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -29,6 +31,7 @@
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -69,8 +72,11 @@
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
     private final GetSource getSource;
+    private final GetSourceStatus getSourceStatus;
     private final ListSources listSources;
     private final UpdateSource updateSource;
+    private final RestartSource restartSource;
+    private final StopSource stopSource;
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
@@ -80,13 +86,19 @@ public CmdSources(PulsarAdmin admin) {
         deleteSource = new DeleteSource();
         listSources = new ListSources();
         getSource = new GetSource();
+        getSourceStatus = new GetSourceStatus();
+        restartSource = new RestartSource();
+        stopSource = new StopSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("get", getSource);
+        jcommander.addCommand("getstatus", getSourceStatus);
         jcommander.addCommand("list", listSources);
+        jcommander.addCommand("stop", stopSource);
+        jcommander.addCommand("restart", restartSource);
         jcommander.addCommand("localrun", localSourceRunner);
         jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
@@ -147,13 +159,13 @@ void processArguments() throws Exception {
         protected String tlsTrustCertFilePath;
 
         private void mergeArgs() {
-            if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
-            if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
-            if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
+            if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+            if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+            if (!isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
             if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
             if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
             if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
-            if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+            if (!isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
         }
 
         @Override
@@ -280,11 +292,11 @@ void runCmd() throws Exception {
 
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
-            if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
-            if (!StringUtils.isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
-            if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
-            if (!StringUtils.isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
-            if (!StringUtils.isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
+            if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
+            if (!isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
+            if (!isBlank(DEPRECATED_className)) className = DEPRECATED_className;
+            if (!isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
+            if (!isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
         }
 
         @Override
@@ -382,7 +394,7 @@ private void inferMissingArguments(SourceConfig sourceConfig) {
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
-            if (StringUtils.isBlank(sourceConfig.getArchive())) {
+            if (isBlank(sourceConfig.getArchive())) {
                 throw new ParameterException("Source archive not specfied");
             }
 
@@ -548,6 +560,65 @@ void runCmd() throws Exception {
         }
     }
 
+    @Parameters(commandDescription = "Check the current status of a Pulsar Source")
+    class GetSourceStatus extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (Get-status of all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = Utils.printJson(
+                    isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName)
+                            : admin.source().getSourceStatus(tenant, namespace, sourceName,
+                            Integer.parseInt(instanceId)));
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(new JsonParser().parse(json)));
+        }
+    }
+
+    @Parameters(commandDescription = "Restart source instance")
+    class RestartSource extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.source().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.source().restartSource(tenant, namespace, sourceName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Temporary stops source instance. (If worker restarts then it reassigns and starts source again")
+    class StopSource extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (stop all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.source().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.source().stopSource(tenant, namespace, sourceName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
     public class ListBuiltInSources extends BaseCommand {
         @Override
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 44bb3bd0f4..0eebe794cf 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -403,8 +403,8 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin
         return Response.status(Status.OK).entity(retval).build();
     }
 
-    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) throws IOException {
+    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) throws IOException {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -412,23 +412,28 @@ public Response getFunctionInstanceStatus(final String tenant, final String name
 
         // validate parameters
         try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
-        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
         int instanceIdInt = Integer.parseInt(instanceId);
         if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName);
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
         }
@@ -436,12 +441,12 @@ public Response getFunctionInstanceStatus(final String tenant, final String name
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         FunctionStatus functionStatus = null;
         try {
-            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName,
+            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName,
                     Integer.parseInt(instanceId), uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
 
@@ -449,18 +454,18 @@ public Response getFunctionInstanceStatus(final String tenant, final String name
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, false, uri);
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, true, uri);
+    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, boolean restart, URI uri) {
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, boolean restart, URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -468,42 +473,51 @@ public Response stopFunctionInstance(final String tenant, final String namespace
 
         // validate parameters
         try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName,
+            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
                     Integer.parseInt(instanceId), restart, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e);
+            log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName) {
-        return stopFunctionInstances(tenant, namespace, functionName, false);
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                          final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, false);
     }
 
-    public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) {
-        return stopFunctionInstances(tenant, namespace, functionName, true);
+    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                             final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, true);
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName,
-            boolean restart) {
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+            final String componentType, boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -511,32 +525,40 @@ public Response stopFunctionInstances(final String tenant, final String namespac
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart);
+            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri)
+    public Response getFunctionStatus(final String tenant, final String namespace, final String componentName,
+                                      final String componentType, URI uri)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -545,24 +567,31 @@ public Response getFunctionStatus(final String tenant, final String namespace, f
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         InstanceCommunication.FunctionStatusList functionStatusList = null;
         try {
-            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri);
+            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, componentName, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
@@ -870,11 +899,11 @@ private void validateListFunctionRequestParams(String tenant, String namespace)
         }
     }
 
-    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
-            String instanceId) throws IllegalArgumentException {
-        validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
+            String componentType, String instanceId) throws IllegalArgumentException {
+        validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         if (instanceId == null) {
-            throw new IllegalArgumentException("Function Instance Id is not provided");
+            throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
         }
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 405f88f84c..e37a88a38a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -107,7 +107,7 @@ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tena
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -116,7 +116,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
     }
 
     @GET
@@ -150,7 +150,7 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -162,7 +162,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
@@ -175,7 +175,7 @@ public Response restartFunction(final @PathParam("tenant") String tenant,
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -187,7 +187,7 @@ public Response stopFunction(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 488f47d335..934b0fc599 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -97,7 +97,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
                                           final @PathParam("sinkName") String sinkName,
                                           final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+            tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -105,7 +105,7 @@ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
     public Response getSinkStatus(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
     }
 
     @GET
@@ -126,7 +126,7 @@ public Response listSink(final @PathParam("tenant") String tenant,
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -138,7 +138,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+        return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @POST
@@ -151,7 +151,7 @@ public Response restartSink(final @PathParam("tenant") String tenant,
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -163,7 +163,7 @@ public Response stopSink(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+        return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 3b1222ec66..2c39344207 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -97,7 +97,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
                                             final @PathParam("sourceName") String sourceName,
                                             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -105,7 +105,7 @@ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant
     public Response getSourceStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sourceName") String sourceName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
     }
 
     @GET
@@ -126,7 +126,7 @@ public Response listSources(final @PathParam("tenant") String tenant,
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -138,7 +138,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+        return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @POST
@@ -151,7 +151,7 @@ public Response restartSource(final @PathParam("tenant") String tenant,
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -163,7 +163,7 @@ public Response stopSource(final @PathParam("tenant") String tenant,
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+        return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 6659fcf1fd..0222bd6a24 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -154,7 +154,7 @@ private void runSinkTester(SinkTester tester, boolean builtin) throws Exception
         }
 
         // wait for sink to process messages
-        waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+        waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
 
         // validate the sink result
         tester.validateSinkResult(kvs);
@@ -238,7 +238,7 @@ protected void getSinkInfoSuccess(SinkTester tester,
     protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -254,7 +254,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t
             } catch (ContainerExecException e) {
                 // expected in early iterations
             }
-            log.info("Backoff 1 second until the function is running");
+            log.info("Backoff 1 second until the sink is running");
             TimeUnit.SECONDS.sleep(1);
         }
     }
@@ -482,7 +482,7 @@ protected void getSourceInfoSuccess(SourceTester tester,
     protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -518,7 +518,7 @@ protected void waitForProcessingSourceMessages(String tenant,
                                                    int numMessages) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -541,6 +541,35 @@ protected void waitForProcessingSourceMessages(String tenant,
         }
     }
 
+    protected void waitForProcessingSinkMessages(String tenant,
+                                                 String namespace,
+                                                 String sinkName,
+                                                 int numMessages) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sink",
+                "getstatus",
+                "--tenant", tenant,
+                "--namespace", namespace,
+                "--name", sinkName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            try {
+                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get sink status : {}", result.getStdout());
+                if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\"")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected for early iterations
+            }
+            log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
+                    stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
     protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services