You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/19 17:00:32 UTC

[pulsar] branch master updated: Added stop/restart functionality in sources/sinks (#2810)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new f3a027b  Added stop/restart functionality in sources/sinks (#2810)
f3a027b is described below

commit f3a027b45f8244d19ac24709edc7f762abb5ce6f
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri Oct 19 10:00:25 2018 -0700

    Added stop/restart functionality in sources/sinks (#2810)
    
    * Added Get and List source/sink functionality
    
    * Fixed compile
    
    * Removed test that doesnt make sense any more
    
    * Fixed build
    
    * Fixed logic
    
    * Return error response
    
    * Return response on error
    
    * Fix unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Fixed unittest
    
    * Added get/list sinks tests
    
    * Added get/list tests
    
    * Add more unittests
    
    * Added more unittests
    
    * Added TODO
    
    * Took feedback
    
    * Fix unittest
    
    * Fix unittest
    
    * Fix unittest
    
    * Fixed integration tests
    
    * Fixed integration test
    
    * Added restart/stop functionality to the sources/sinks
    
    * Added getstatus method to sources/sink
    
    * Fix integration tests
---
 .../pulsar/broker/admin/impl/FunctionsBase.java    |  12 +-
 .../apache/pulsar/broker/admin/impl/SinkBase.java  |  12 +-
 .../pulsar/broker/admin/impl/SourceBase.java       |  12 +-
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  69 +++++++++++
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  91 +++++++++++++--
 .../functions/worker/rest/api/FunctionsImpl.java   | 129 +++++++++++++--------
 .../worker/rest/api/v2/FunctionApiV2Resource.java  |  12 +-
 .../worker/rest/api/v2/SinkApiV2Resource.java      |  12 +-
 .../worker/rest/api/v2/SourceApiV2Resource.java    |  12 +-
 .../integration/functions/PulsarFunctionsTest.java |  39 ++++++-
 10 files changed, 299 insertions(+), 101 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index b50da21..01dd354 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -162,7 +162,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -179,7 +179,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
     }
 
     @GET
@@ -256,7 +256,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -268,7 +268,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
@@ -281,7 +281,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -293,7 +293,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 0f5a5c5..ba7dedc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -148,7 +148,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
                                           final @PathParam("sinkName") String sinkName,
                                           final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+            tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -164,7 +164,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public Response getSinkStatus(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
     }
 
     @GET
@@ -194,7 +194,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -206,7 +206,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+        return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @POST
@@ -219,7 +219,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -231,7 +231,7 @@ public class SinkBase extends AdminResource implements Supplier<WorkerService> {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+        return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 4bda489..c695a1a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -150,7 +150,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
                                             final @PathParam("sourceName") String sourceName,
                                             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -166,7 +166,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public Response getSourceStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sourceName") String sourceName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
     }
 
     @GET
@@ -197,7 +197,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -209,7 +209,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+        return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @POST
@@ -222,7 +222,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @POST
@@ -234,7 +234,7 @@ public class SourceBase extends AdminResource implements Supplier<WorkerService>
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+        return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 3b9159e..37d2d9b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -31,6 +31,7 @@ import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -67,6 +68,9 @@ public class CmdSinks extends CmdBase {
     private final DeleteSink deleteSink;
     private final ListSinks listSinks;
     private final GetSink getSink;
+    private final GetSinkStatus getSinkStatus;
+    private final StopSink stopSink;
+    private final RestartSink restartSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
@@ -76,6 +80,9 @@ public class CmdSinks extends CmdBase {
         deleteSink = new DeleteSink();
         listSinks = new ListSinks();
         getSink = new GetSink();
+        getSinkStatus = new GetSinkStatus();
+        stopSink = new StopSink();
+        restartSink = new RestartSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
@@ -83,6 +90,9 @@ public class CmdSinks extends CmdBase {
         jcommander.addCommand("delete", deleteSink);
         jcommander.addCommand("list", listSinks);
         jcommander.addCommand("get", getSink);
+        jcommander.addCommand("getstatus", getSinkStatus);
+        jcommander.addCommand("stop", stopSink);
+        jcommander.addCommand("restart", restartSink);
         jcommander.addCommand("localrun", localSinkRunner);
         jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
@@ -590,6 +600,65 @@ public class CmdSinks extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Check the current status of a Pulsar Sink")
+    class GetSinkStatus extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (Get-status of all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = Utils.printJson(
+                    isBlank(instanceId) ? admin.sink().getSinkStatus(tenant, namespace, sinkName)
+                            : admin.sink().getSinkStatus(tenant, namespace, sinkName,
+                            Integer.parseInt(instanceId)));
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(new JsonParser().parse(json)));
+        }
+    }
+
+    @Parameters(commandDescription = "Restart sink instance")
+    class RestartSink extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.sink().restartSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.sink().restartSink(tenant, namespace, sinkName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Temporary stops sink instance. (If worker restarts then it reassigns and starts sink again")
+    class StopSink extends SinkCommand {
+
+        @Parameter(names = "--instance-id", description = "The sink instanceId (stop all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.sink().stopSink(tenant, namespace, sinkName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.sink().stopSink(tenant, namespace, sinkName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sinks supported by Pulsar cluster")
     public class ListBuiltInSinks extends BaseCommand {
         @Override
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f27b0a5..5a1e9b3 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pulsar.admin.cli;
 
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
 import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.fileExists;
@@ -29,6 +31,7 @@ import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
+import com.google.gson.JsonParser;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -69,8 +72,11 @@ public class CmdSources extends CmdBase {
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
     private final GetSource getSource;
+    private final GetSourceStatus getSourceStatus;
     private final ListSources listSources;
     private final UpdateSource updateSource;
+    private final RestartSource restartSource;
+    private final StopSource stopSource;
     private final LocalSourceRunner localSourceRunner;
 
     public CmdSources(PulsarAdmin admin) {
@@ -80,13 +86,19 @@ public class CmdSources extends CmdBase {
         deleteSource = new DeleteSource();
         listSources = new ListSources();
         getSource = new GetSource();
+        getSourceStatus = new GetSourceStatus();
+        restartSource = new RestartSource();
+        stopSource = new StopSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
         jcommander.addCommand("get", getSource);
+        jcommander.addCommand("getstatus", getSourceStatus);
         jcommander.addCommand("list", listSources);
+        jcommander.addCommand("stop", stopSource);
+        jcommander.addCommand("restart", restartSource);
         jcommander.addCommand("localrun", localSourceRunner);
         jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
@@ -147,13 +159,13 @@ public class CmdSources extends CmdBase {
         protected String tlsTrustCertFilePath;
 
         private void mergeArgs() {
-            if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
-            if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
-            if (!StringUtils.isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
+            if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
+            if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
+            if (!isBlank(DEPRECATED_clientAuthParams)) clientAuthParams = DEPRECATED_clientAuthParams;
             if (DEPRECATED_useTls != null) useTls = DEPRECATED_useTls;
             if (DEPRECATED_tlsAllowInsecureConnection != null) tlsAllowInsecureConnection = DEPRECATED_tlsAllowInsecureConnection;
             if (DEPRECATED_tlsHostNameVerificationEnabled != null) tlsHostNameVerificationEnabled = DEPRECATED_tlsHostNameVerificationEnabled;
-            if (!StringUtils.isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
+            if (!isBlank(DEPRECATED_tlsTrustCertFilePath)) tlsTrustCertFilePath = DEPRECATED_tlsTrustCertFilePath;
         }
 
         @Override
@@ -280,11 +292,11 @@ public class CmdSources extends CmdBase {
 
         private void mergeArgs() {
             if (DEPRECATED_processingGuarantees != null) processingGuarantees = DEPRECATED_processingGuarantees;
-            if (!StringUtils.isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
-            if (!StringUtils.isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
-            if (!StringUtils.isBlank(DEPRECATED_className)) className = DEPRECATED_className;
-            if (!StringUtils.isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
-            if (!StringUtils.isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
+            if (!isBlank(DEPRECATED_destinationTopicName)) destinationTopicName = DEPRECATED_destinationTopicName;
+            if (!isBlank(DEPRECATED_deserializationClassName)) deserializationClassName = DEPRECATED_deserializationClassName;
+            if (!isBlank(DEPRECATED_className)) className = DEPRECATED_className;
+            if (!isBlank(DEPRECATED_sourceConfigFile)) sourceConfigFile = DEPRECATED_sourceConfigFile;
+            if (!isBlank(DEPRECATED_sourceConfigString)) sourceConfigString = DEPRECATED_sourceConfigString;
         }
 
         @Override
@@ -382,7 +394,7 @@ public class CmdSources extends CmdBase {
         }
 
         protected void validateSourceConfigs(SourceConfig sourceConfig) {
-            if (StringUtils.isBlank(sourceConfig.getArchive())) {
+            if (isBlank(sourceConfig.getArchive())) {
                 throw new ParameterException("Source archive not specfied");
             }
 
@@ -548,6 +560,65 @@ public class CmdSources extends CmdBase {
         }
     }
 
+    @Parameters(commandDescription = "Check the current status of a Pulsar Source")
+    class GetSourceStatus extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (Get-status of all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            String json = Utils.printJson(
+                    isBlank(instanceId) ? admin.source().getSourceStatus(tenant, namespace, sourceName)
+                            : admin.source().getSourceStatus(tenant, namespace, sourceName,
+                            Integer.parseInt(instanceId)));
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(new JsonParser().parse(json)));
+        }
+    }
+
+    @Parameters(commandDescription = "Restart source instance")
+    class RestartSource extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (restart all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.source().restartSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.source().restartSource(tenant, namespace, sourceName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Temporary stops source instance. (If worker restarts then it reassigns and starts source again")
+    class StopSource extends SourceCommand {
+
+        @Parameter(names = "--instance-id", description = "The source instanceId (stop all instances if instance-id is not provided")
+        protected String instanceId;
+
+        @Override
+        void runCmd() throws Exception {
+            if (isNotBlank(instanceId)) {
+                try {
+                    admin.source().stopSource(tenant, namespace, sourceName, Integer.parseInt(instanceId));
+                } catch (NumberFormatException e) {
+                    System.err.println("instance-id must be a number");
+                }
+            } else {
+                admin.source().stopSource(tenant, namespace, sourceName);
+            }
+            System.out.println("Restarted successfully");
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
     public class ListBuiltInSources extends BaseCommand {
         @Override
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 44bb3bd..0eebe79 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -403,8 +403,8 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(retval).build();
     }
 
-    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) throws IOException {
+    public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) throws IOException {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -412,23 +412,28 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
-        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
         int instanceIdInt = Integer.parseInt(instanceId);
         if (instanceIdInt < 0 || instanceIdInt >= functionMetaData.getFunctionDetails().getParallelism()) {
-            log.error("instanceId in getFunctionStatus out of bounds @ /{}/{}/{}", tenant, namespace, functionName);
+            log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(String.format("Invalid InstanceId"))).build();
         }
@@ -436,12 +441,12 @@ public class FunctionsImpl {
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         FunctionStatus functionStatus = null;
         try {
-            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, functionName,
+            functionStatus = functionRuntimeManager.getFunctionInstanceStatus(tenant, namespace, componentName,
                     Integer.parseInt(instanceId), uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Got Exception Getting Status", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
 
@@ -449,18 +454,18 @@ public class FunctionsImpl {
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, false, uri);
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, URI uri) {
-        return stopFunctionInstance(tenant, namespace, functionName, instanceId, true, uri);
+    public Response restartFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, URI uri) {
+        return stopFunctionInstance(tenant, namespace, componentName, componentType, instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant, final String namespace, final String functionName,
-            final String instanceId, boolean restart, URI uri) {
+    public Response stopFunctionInstance(final String tenant, final String namespace, final String componentName,
+            final String componentType, final String instanceId, boolean restart, URI uri) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -468,42 +473,51 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionInstanceRequestParams(tenant, namespace, functionName, instanceId);
+            validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart-function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, functionName,
+            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
                     Integer.parseInt(instanceId), restart, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart function: {}/{}/{}/{}", tenant, namespace, functionName, instanceId, e);
+            log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName) {
-        return stopFunctionInstances(tenant, namespace, functionName, false);
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                          final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, false);
     }
 
-    public Response restartFunctionInstances(final String tenant, final String namespace, final String functionName) {
-        return stopFunctionInstances(tenant, namespace, functionName, true);
+    public Response restartFunctionInstances(final String tenant, final String namespace, final String componentName,
+                                             final String componentType) {
+        return stopFunctionInstances(tenant, namespace, componentName, componentType, true);
     }
 
-    public Response stopFunctionInstances(final String tenant, final String namespace, final String functionName,
-            boolean restart) {
+    public Response stopFunctionInstances(final String tenant, final String namespace, final String componentName,
+            final String componentType, boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -511,32 +525,40 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in getFunctionStatus does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, functionName, restart);
+            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
-            log.error("Failed to restart function: {}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
         }
     }
 
-    public Response getFunctionStatus(final String tenant, final String namespace, final String functionName, URI uri)
+    public Response getFunctionStatus(final String tenant, final String namespace, final String componentName,
+                                      final String componentType, URI uri)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -545,24 +567,31 @@ public class FunctionsImpl {
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} Status request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunctionStatus does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} in get {} Status does not exist @ /{}/{}/{}", componentType, componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         InstanceCommunication.FunctionStatusList functionStatusList = null;
         try {
-            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, functionName, uri);
+            functionStatusList = functionRuntimeManager.getAllFunctionStatus(tenant, namespace, componentName, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
@@ -870,11 +899,11 @@ public class FunctionsImpl {
         }
     }
 
-    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
-            String instanceId) throws IllegalArgumentException {
-        validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
+    private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String componentName,
+            String componentType, String instanceId) throws IllegalArgumentException {
+        validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         if (instanceId == null) {
-            throw new IllegalArgumentException("Function Instance Id is not provided");
+            throw new IllegalArgumentException(String.format("%s Instance Id is not provided", componentType));
         }
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 405f88f..e37a88a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -107,7 +107,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                               final @PathParam("functionName") String functionName,
                                               final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -116,7 +116,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
                                       final @PathParam("namespace") String namespace,
                                       final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStatus(
-            tenant, namespace, functionName, uri.getRequestUri());
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION, uri.getRequestUri());
     }
 
     @GET
@@ -150,7 +150,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -162,7 +162,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+        return functions.restartFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
@@ -175,7 +175,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, functionName, FunctionsImpl.FUNCTION, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -187,7 +187,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+        return functions.stopFunctionInstances(tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @POST
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 488f47d..934b0fc 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -97,7 +97,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                           final @PathParam("sinkName") String sinkName,
                                           final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+            tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -105,7 +105,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     public Response getSinkStatus(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sinkName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sinkName, FunctionsImpl.SINK, uri.getRequestUri());
     }
 
     @GET
@@ -126,7 +126,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -138,7 +138,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.restartFunctionInstances(tenant, namespace, sinkName);
+        return functions.restartFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @POST
@@ -151,7 +151,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sinkName, FunctionsImpl.SINK, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -163,7 +163,7 @@ public class SinkApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.stopFunctionInstances(tenant, namespace, sinkName);
+        return functions.stopFunctionInstances(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 3b1222e..2c39344 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -97,7 +97,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
                                             final @PathParam("sourceName") String sourceName,
                                             final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionInstanceStatus(
-            tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -105,7 +105,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     public Response getSourceStatus(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("sourceName") String sourceName) throws IOException {
-        return functions.getFunctionStatus(tenant, namespace, sourceName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, sourceName, FunctionsImpl.SOURCE, uri.getRequestUri());
     }
 
     @GET
@@ -126,7 +126,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        return functions.restartFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -138,7 +138,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response restartSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.restartFunctionInstances(tenant, namespace, sourceName);
+        return functions.restartFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @POST
@@ -151,7 +151,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
             final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+        return functions.stopFunctionInstance(tenant, namespace, sourceName, FunctionsImpl.SOURCE, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -163,7 +163,7 @@ public class SourceApiV2Resource extends FunctionApiResource {
     @Consumes(MediaType.APPLICATION_JSON)
     public Response stopSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.stopFunctionInstances(tenant, namespace, sourceName);
+        return functions.stopFunctionInstances(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 6659fcf..0222bd6 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -154,7 +154,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
 
         // wait for sink to process messages
-        waitForProcessingMessages(tenant, namespace, sinkName, numMessages);
+        waitForProcessingSinkMessages(tenant, namespace, sinkName, numMessages);
 
         // validate the sink result
         tester.validateSinkResult(kvs);
@@ -238,7 +238,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     protected void getSinkStatus(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -254,7 +254,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
             } catch (ContainerExecException e) {
                 // expected in early iterations
             }
-            log.info("Backoff 1 second until the function is running");
+            log.info("Backoff 1 second until the sink is running");
             TimeUnit.SECONDS.sleep(1);
         }
     }
@@ -482,7 +482,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
     protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -518,7 +518,7 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
                                                    int numMessages) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "getstatus",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -541,6 +541,35 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase {
         }
     }
 
+    protected void waitForProcessingSinkMessages(String tenant,
+                                                 String namespace,
+                                                 String sinkName,
+                                                 int numMessages) throws Exception {
+        String[] commands = {
+                PulsarCluster.ADMIN_SCRIPT,
+                "sink",
+                "getstatus",
+                "--tenant", tenant,
+                "--namespace", namespace,
+                "--name", sinkName
+        };
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        while (true) {
+            try {
+                ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
+                log.info("Get sink status : {}", result.getStdout());
+                if (result.getStdout().contains("\"numProcessed\": \"" + numMessages + "\"")) {
+                    return;
+                }
+            } catch (ContainerExecException e) {
+                // expected for early iterations
+            }
+            log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second",
+                    stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages);
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+
     protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,