You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/19 04:59:29 UTC

[GitHub] srkukarni closed pull request #2807: Source/Sink Endpoint validations

srkukarni closed pull request #2807: Source/Sink Endpoint validations
URL: https://github.com/apache/pulsar/pull/2807
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index def4b2be47..b50da21bfd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @PUT
@@ -106,7 +106,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -124,7 +124,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @GET
@@ -143,7 +143,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
                                     final @PathParam("namespace") String namespace,
                                     final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @GET
@@ -196,7 +196,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.FUNCTION);
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2fe398973d..0f5a5c5d3b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -72,7 +72,7 @@ public Response registerSink(final @PathParam("tenant") String tenant,
                                  final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
     }
 
     @PUT
@@ -93,7 +93,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
                                final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -111,7 +111,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
     public Response deregisterSink(final @PathParam("tenant") String tenant,
                                    final @PathParam("namespace") String namespace,
                                    final @PathParam("sinkName") String sinkName) {
-        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
     }
 
     @GET
@@ -129,7 +129,7 @@ public Response deregisterSink(final @PathParam("tenant") String tenant,
     public Response getSinkInfo(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("sinkName") String sinkName) throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sinkName);
+        return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
@@ -180,7 +180,7 @@ public Response getSinkStatus(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}")
     public Response listSinks(final @PathParam("tenant") String tenant,
                               final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 1a82ac2880..4bda48995c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -73,7 +73,7 @@ public Response registerSource(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @PUT
@@ -94,7 +94,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
                                  final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -112,7 +112,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
     public Response deregisterSource(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("sourceName") String sourceName) {
-        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @GET
@@ -131,7 +131,7 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sourceName") String sourceName) throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, sourceName);
+            tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
@@ -183,7 +183,7 @@ public Response getSourceStatus(final @PathParam("tenant") String tenant,
     public Response listSources(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.SOURCE);
 
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index afa7ef1dd7..acd6ed3946 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -308,7 +308,7 @@ public void testFunctionAssignmentsWithRestart() throws Exception {
         // validate updated function prop = auto-ack=false and instnaceid
         for (int i = 0; i < (totalFunctions - totalDeletedFunction); i++) {
             String functionName = baseFunctionName + i;
-            assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).getAutoAck());
+            assertFalse(admin.functions().getFunction(tenant, namespacePortion, functionName).isAutoAck());
         }
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index babcdd6df4..21133d678d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -407,36 +407,6 @@ public void testAuthorization(boolean validRoleName) throws Exception {
         }
     }
 
-    /**
-     * Test to verify: function-server loads jar using file-url and derives type-args classes if not provided
-     * @throws Exception
-     */
-    @Test(timeOut = 20000)
-    public void testFileUrlFunctionWithoutPassingTypeArgs() throws Exception {
-
-        final String namespacePortion = "io";
-        final String replNamespace = tenant + "/" + namespacePortion;
-        final String sinkTopic = "persistent://" + replNamespace + "/output";
-        final String functionName = "PulsarSink-test";
-        final String subscriptionName = "test-sub";
-        admin.namespaces().createNamespace(replNamespace);
-        Set<String> clusters = Sets.newHashSet(Lists.newArrayList("use"));
-        admin.namespaces().setNamespaceReplicationClusters(replNamespace, clusters);
-
-        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-examples.jar").getFile();
-
-        FunctionConfig functionConfig = createFunctionConfig(tenant, namespacePortion, functionName,
-                "my.*", sinkTopic, subscriptionName);
-
-        admin.functions().createFunctionWithUrl(functionConfig, jarFilePathUrl);
-
-        FunctionDetails functionMetadata = admin.source().getSource(tenant, namespacePortion, functionName);
-
-        assertEquals(functionMetadata.getSource().getTypeClassName(), String.class.getName());
-        assertEquals(functionMetadata.getSink().getTypeClassName(), String.class.getName());
-
-    }
-
     @Test(timeOut = 20000)
     public void testFunctionStopAndRestartApi() throws Exception {
 
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
index 10c890c810..a72260584e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Functions.java
@@ -25,7 +25,6 @@
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -77,7 +76,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
+    FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException;
 
     /**
      * Create a new function.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
index 3f8fe2f478..afad6f371f 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Sink.java
@@ -22,13 +22,11 @@
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SinkConfig;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * Admin interface for Sink management.
@@ -50,7 +48,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<String> getSinks(String tenant, String namespace) throws PulsarAdminException;
+    List<String> listSinks(String tenant, String namespace) throws PulsarAdminException;
 
     /**
      * Get the configuration for the specified sink.
@@ -77,7 +75,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    Function.FunctionDetails getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
+    SinkConfig getSink(String tenant, String namespace, String sink) throws PulsarAdminException;
 
     /**
      * Create a new sink.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
index 3c43cf203f..9d1a318f11 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/Source.java
@@ -22,13 +22,11 @@
 import org.apache.pulsar.client.admin.PulsarAdminException.NotFoundException;
 import org.apache.pulsar.client.admin.PulsarAdminException.PreconditionFailedException;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SourceConfig;
 
 import java.util.List;
-import java.util.Set;
 
 /**
  * Admin interface for Source management.
@@ -50,7 +48,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    List<String> getSources(String tenant, String namespace) throws PulsarAdminException;
+    List<String> listSources(String tenant, String namespace) throws PulsarAdminException;
 
     /**
      * Get the configuration for the specified source.
@@ -77,7 +75,7 @@
      * @throws PulsarAdminException
      *             Unexpected error
      */
-    Function.FunctionDetails getSource(String tenant, String namespace, String source) throws PulsarAdminException;
+    SourceConfig getSource(String tenant, String namespace, String source) throws PulsarAdminException;
 
     /**
      * Create a new source.
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index b9ea1a5244..77cc3d6816 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -46,7 +46,6 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.FunctionConfig;
@@ -80,16 +79,13 @@ public FunctionsImpl(WebTarget web, Authentication auth) {
     }
 
     @Override
-    public FunctionDetails getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
+    public FunctionConfig getFunction(String tenant, String namespace, String function) throws PulsarAdminException {
         try {
              Response response = request(functions.path(tenant).path(namespace).path(function)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(FunctionConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 4e13693cc0..d117374131 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -27,7 +27,6 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SinkConfig;
@@ -44,8 +43,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 @Slf4j
 public class SinkImpl extends BaseResource implements Sink {
@@ -58,7 +55,7 @@ public SinkImpl(WebTarget web, Authentication auth) {
     }
 
     @Override
-    public List<String> getSinks(String tenant, String namespace) throws PulsarAdminException {
+    public List<String> listSinks(String tenant, String namespace) throws PulsarAdminException {
         try {
             Response response = request(sink.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public SinkImpl(WebTarget web, Authentication auth) {
     }
 
     @Override
-    public FunctionDetails getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
+    public SinkConfig getSink(String tenant, String namespace, String sinkName) throws PulsarAdminException {
         try {
              Response response = request(sink.path(tenant).path(namespace).path(sinkName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(SinkConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index 65a2bfc55a..0c7a1df389 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -27,7 +27,6 @@
 import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.ErrorData;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatusList;
 import org.apache.pulsar.functions.utils.SourceConfig;
@@ -44,8 +43,6 @@
 import java.io.File;
 import java.io.IOException;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 @Slf4j
 public class SourceImpl extends BaseResource implements Source {
@@ -58,7 +55,7 @@ public SourceImpl(WebTarget web, Authentication auth) {
     }
 
     @Override
-    public List<String> getSources(String tenant, String namespace) throws PulsarAdminException {
+    public List<String> listSources(String tenant, String namespace) throws PulsarAdminException {
         try {
             Response response = request(source.path(tenant).path(namespace)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
@@ -72,16 +69,13 @@ public SourceImpl(WebTarget web, Authentication auth) {
     }
 
     @Override
-    public FunctionDetails getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
+    public SourceConfig getSource(String tenant, String namespace, String sourceName) throws PulsarAdminException {
         try {
              Response response = request(source.path(tenant).path(namespace).path(sourceName)).get();
             if (!response.getStatusInfo().equals(Response.Status.OK)) {
                 throw new ClientErrorException(response);
             }
-            String jsonResponse = response.readEntity(String.class);
-            FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
-            mergeJson(jsonResponse, functionDetailsBuilder);
-            return functionDetailsBuilder.build();
+            return response.readEntity(SourceConfig.class);
         } catch (Exception e) {
             throw getApiException(e);
         }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index 2f23a63039..7e44b440fd 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -697,9 +697,9 @@ void runCmd() throws Exception {
     class GetFunction extends FunctionCommand {
         @Override
         void runCmd() throws Exception {
-            String json = Utils.printJson(admin.functions().getFunction(tenant, namespace, functionName));
+            FunctionConfig functionConfig = admin.functions().getFunction(tenant, namespace, functionName);
             Gson gson = new GsonBuilder().setPrettyPrinting().create();
-            System.out.println(gson.toJson(new JsonParser().parse(json)));
+            System.out.println(gson.toJson(functionConfig));
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index e56a34384e..3b9159e98c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -30,16 +30,14 @@
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.stream.Collectors;
 
 import lombok.Getter;
@@ -67,6 +65,8 @@
     private final CreateSink createSink;
     private final UpdateSink updateSink;
     private final DeleteSink deleteSink;
+    private final ListSinks listSinks;
+    private final GetSink getSink;
     private final LocalSinkRunner localSinkRunner;
 
     public CmdSinks(PulsarAdmin admin) {
@@ -74,11 +74,15 @@ public CmdSinks(PulsarAdmin admin) {
         createSink = new CreateSink();
         updateSink = new UpdateSink();
         deleteSink = new DeleteSink();
+        listSinks = new ListSinks();
+        getSink = new GetSink();
         localSinkRunner = new LocalSinkRunner();
 
         jcommander.addCommand("create", createSink);
         jcommander.addCommand("update", updateSink);
         jcommander.addCommand("delete", deleteSink);
+        jcommander.addCommand("list", listSinks);
+        jcommander.addCommand("get", getSink);
         jcommander.addCommand("localrun", localSinkRunner);
         jcommander.addCommand("available-sinks", new ListBuiltInSinks());
     }
@@ -184,7 +188,7 @@ protected String validateSinkType(String sinkType) throws IOException {
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO sink connector to run in a Pulsar cluster")
-    protected class CreateSink extends SinkCommand {
+    protected class CreateSink extends SinkDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -197,7 +201,7 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Update a Pulsar IO sink connector")
-    protected class UpdateSink extends SinkCommand {
+    protected class UpdateSink extends SinkDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(archive)) {
@@ -209,7 +213,7 @@ void runCmd() throws Exception {
         }
     }
 
-    abstract class SinkCommand extends BaseCommand {
+    abstract class SinkDetailsCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The sink's namespace")
@@ -506,25 +510,70 @@ protected String validateSinkType(String sinkType) throws IOException {
         }
     }
 
-    @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
-    protected class DeleteSink extends BaseCommand {
-
-        @Parameter(names = "--tenant", description = "The tenant of the sink")
+    /**
+     * Sink level command
+     */
+    @Getter
+    abstract class SinkCommand extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of the sink")
+        @Parameter(names = "--namespace", description = "The sink's namespace")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of the sink")
-        protected String name;
+        @Parameter(names = "--name", description = "The sink's name")
+        protected String sinkName;
 
         @Override
         void processArguments() throws Exception {
             super.processArguments();
-            if (null == name) {
-                throw new ParameterException(
+            if (tenant == null) {
+                tenant = PUBLIC_TENANT;
+            }
+            if (namespace == null) {
+                namespace = DEFAULT_NAMESPACE;
+            }
+            if (null == sinkName) {
+                throw new RuntimeException(
                         "You must specify a name for the sink");
             }
+        }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar IO sink connector")
+    protected class DeleteSink extends SinkCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            admin.sink().deleteSink(tenant, namespace, sinkName);
+            print("Deleted successfully");
+        }
+    }
+
+    @Parameters(commandDescription = "Gets the information about a Pulsar IO sink connector")
+    protected class GetSink extends SinkCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            SinkConfig sinkConfig = admin.sink().getSink(tenant, namespace, sinkName);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sinkConfig));
+        }
+    }
+
+    /**
+     * List Sources command
+     */
+    @Parameters(commandDescription = "List all running Pulsar IO sink connectors")
+    protected class ListSinks extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The sink's namespace")
+        protected String namespace;
+
+        @Override
+        public void processArguments() {
             if (tenant == null) {
                 tenant = PUBLIC_TENANT;
             }
@@ -535,8 +584,9 @@ void processArguments() throws Exception {
 
         @Override
         void runCmd() throws Exception {
-            admin.sink().deleteSink(tenant, namespace, name);
-            print("Deleted successfully");
+            List<String> sinks = admin.sink().listSinks(tenant, namespace);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sinks));
         }
     }
 
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index f2d768199e..f27b0a5437 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -28,6 +28,7 @@
 import com.beust.jcommander.Parameters;
 import com.beust.jcommander.converters.StringConverter;
 import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
 import com.google.gson.reflect.TypeToken;
 
 import java.io.File;
@@ -35,6 +36,7 @@
 import java.lang.reflect.Type;
 import java.nio.file.Paths;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
@@ -66,6 +68,8 @@
 
     private final CreateSource createSource;
     private final DeleteSource deleteSource;
+    private final GetSource getSource;
+    private final ListSources listSources;
     private final UpdateSource updateSource;
     private final LocalSourceRunner localSourceRunner;
 
@@ -74,11 +78,15 @@ public CmdSources(PulsarAdmin admin) {
         createSource = new CreateSource();
         updateSource = new UpdateSource();
         deleteSource = new DeleteSource();
+        listSources = new ListSources();
+        getSource = new GetSource();
         localSourceRunner = new LocalSourceRunner();
 
         jcommander.addCommand("create", createSource);
         jcommander.addCommand("update", updateSource);
         jcommander.addCommand("delete", deleteSource);
+        jcommander.addCommand("get", getSource);
+        jcommander.addCommand("list", listSources);
         jcommander.addCommand("localrun", localSourceRunner);
         jcommander.addCommand("available-sources", new ListBuiltInSources());
     }
@@ -184,7 +192,7 @@ protected String validateSourceType(String sourceType) throws IOException {
     }
 
     @Parameters(commandDescription = "Submit a Pulsar IO source connector to run in a Pulsar cluster")
-    protected class CreateSource extends SourceCommand {
+    protected class CreateSource extends SourceDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(this.sourceConfig.getArchive())) {
@@ -197,7 +205,7 @@ void runCmd() throws Exception {
     }
 
     @Parameters(commandDescription = "Update a Pulsar IO source connector")
-    protected class UpdateSource extends SourceCommand {
+    protected class UpdateSource extends SourceDetailsCommand {
         @Override
         void runCmd() throws Exception {
             if (Utils.isFunctionPackageUrlSupported(sourceConfig.getArchive())) {
@@ -209,7 +217,7 @@ void runCmd() throws Exception {
         }
     }
 
-    abstract class SourceCommand extends BaseCommand {
+    abstract class SourceDetailsCommand extends BaseCommand {
         @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
         @Parameter(names = "--namespace", description = "The source's namespace")
@@ -460,40 +468,86 @@ protected String validateSourceType(String sourceType) throws IOException {
         }
     }
 
-    @Parameters(commandDescription = "Stops a Pulsar IO source connector")
-    protected class DeleteSource extends BaseCommand {
-
-        @Parameter(names = "--tenant", description = "The tenant of a sink or source")
+    /**
+     * Function level command
+     */
+    @Getter
+    abstract class SourceCommand extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The source's tenant")
         protected String tenant;
 
-        @Parameter(names = "--namespace", description = "The namespace of a sink or source")
+        @Parameter(names = "--namespace", description = "The source's namespace")
         protected String namespace;
 
-        @Parameter(names = "--name", description = "The name of a sink or source")
-        protected String name;
+        @Parameter(names = "--name", description = "The source's name")
+        protected String sourceName;
 
         @Override
         void processArguments() throws Exception {
             super.processArguments();
-            if (null == name) {
-                throw new ParameterException(
-                        "You must specify a name for the source");
-            }
             if (tenant == null) {
                 tenant = PUBLIC_TENANT;
             }
             if (namespace == null) {
                 namespace = DEFAULT_NAMESPACE;
             }
+            if (null == sourceName) {
+                throw new RuntimeException(
+                            "You must specify a name for the source");
+            }
         }
+    }
+
+    @Parameters(commandDescription = "Stops a Pulsar IO source connector")
+    protected class DeleteSource extends SourceCommand {
 
         @Override
         void runCmd() throws Exception {
-            admin.source().deleteSource(tenant, namespace, name);
+            admin.source().deleteSource(tenant, namespace, sourceName);
             print("Delete source successfully");
         }
     }
 
+    @Parameters(commandDescription = "Gets the information about a Pulsar IO source connector")
+    protected class GetSource extends SourceCommand {
+
+        @Override
+        void runCmd() throws Exception {
+            SourceConfig sourceConfig = admin.source().getSource(tenant, namespace, sourceName);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sourceConfig));
+        }
+    }
+
+    /**
+     * List Sources command
+     */
+    @Parameters(commandDescription = "List all running Pulsar IO source connectors")
+    protected class ListSources extends BaseCommand {
+        @Parameter(names = "--tenant", description = "The sink's tenant")
+        protected String tenant;
+
+        @Parameter(names = "--namespace", description = "The sink's namespace")
+        protected String namespace;
+
+        @Override
+        public void processArguments() {
+            if (tenant == null) {
+                tenant = PUBLIC_TENANT;
+            }
+            if (namespace == null) {
+                namespace = DEFAULT_NAMESPACE;
+            }
+        }
+
+        @Override
+        void runCmd() throws Exception {
+            List<String> sources = admin.source().listSources(tenant, namespace);
+            Gson gson = new GsonBuilder().setPrettyPrinting().create();
+            System.out.println(gson.toJson(sources));
+        }
+    }
+
     @Parameters(commandDescription = "Get the list of Pulsar IO connector sources supported by Pulsar cluster")
     public class ListBuiltInSources extends BaseCommand {
         @Override
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index 0c561453fd..b52bc17498 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -962,7 +962,7 @@ public void testMixCliAndConfigFile(
     public void testDeleteMissingTenant() throws Exception {
         deleteSink.tenant = null;
         deleteSink.namespace = NAMESPACE;
-        deleteSink.name = NAME;
+        deleteSink.sinkName = NAME;
 
         deleteSink.processArguments();
 
@@ -975,7 +975,7 @@ public void testDeleteMissingTenant() throws Exception {
     public void testDeleteMissingNamespace() throws Exception {
         deleteSink.tenant = TENANT;
         deleteSink.namespace = null;
-        deleteSink.name = NAME;
+        deleteSink.sinkName = NAME;
 
         deleteSink.processArguments();
 
@@ -984,11 +984,11 @@ public void testDeleteMissingNamespace() throws Exception {
         verify(sink).deleteSink(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the sink")
     public void testDeleteMissingName() throws Exception {
         deleteSink.tenant = TENANT;
         deleteSink.namespace = NAMESPACE;
-        deleteSink.name = null;
+        deleteSink.sinkName = null;
 
         deleteSink.processArguments();
 
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index fe799bc61f..4a3b3cc2fa 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -820,7 +820,7 @@ public void testMixCliAndConfigFile(
     public void testDeleteMissingTenant() throws Exception {
         deleteSource.tenant = null;
         deleteSource.namespace = NAMESPACE;
-        deleteSource.name = NAME;
+        deleteSource.sourceName = NAME;
 
         deleteSource.processArguments();
 
@@ -833,7 +833,7 @@ public void testDeleteMissingTenant() throws Exception {
     public void testDeleteMissingNamespace() throws Exception {
         deleteSource.tenant = TENANT;
         deleteSource.namespace = null;
-        deleteSource.name = NAME;
+        deleteSource.sourceName = NAME;
 
         deleteSource.processArguments();
 
@@ -842,11 +842,11 @@ public void testDeleteMissingNamespace() throws Exception {
         verify(source).deleteSource(eq(TENANT), eq(DEFAULT_NAMESPACE), eq(NAME));
     }
 
-    @Test(expectedExceptions = ParameterException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
+    @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "You must specify a name for the source")
     public void testDeleteMissingName() throws Exception {
         deleteSource.tenant = TENANT;
         deleteSource.namespace = NAMESPACE;
-        deleteSource.name = null;
+        deleteSource.sourceName = null;
 
         deleteSource.processArguments();
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index cf182a8e4a..11b623d358 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -20,15 +20,18 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 
+import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 
 public class FunctionConfigUtils {
 
@@ -195,4 +198,83 @@ public static FunctionDetails convert(FunctionConfig functionConfig, ClassLoader
         }
         return functionDetailsBuilder.build();
     }
+
+    public static FunctionConfig convertFromDetails(FunctionDetails functionDetails) {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant(functionDetails.getTenant());
+        functionConfig.setNamespace(functionDetails.getNamespace());
+        functionConfig.setName(functionDetails.getName());
+        functionConfig.setParallelism(functionDetails.getParallelism());
+        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+        for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+            ConsumerConfig consumerConfig = new ConsumerConfig();
+            if (!isEmpty(input.getValue().getSerdeClassName())) {
+                consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+            }
+            if (!isEmpty(input.getValue().getSchemaType())) {
+                consumerConfig.setSchemaType(input.getValue().getSchemaType());
+            }
+            consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+            consumerConfigMap.put(input.getKey(), consumerConfig);
+        }
+        functionConfig.setInputSpecs(consumerConfigMap);
+        if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+            functionConfig.setSubName(functionDetails.getSource().getSubscriptionName());
+        }
+        if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+            functionConfig.setRetainOrdering(true);
+            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        } else {
+            functionConfig.setRetainOrdering(false);
+            functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        }
+        functionConfig.setAutoAck(functionDetails.getAutoAck());
+        functionConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (!isEmpty(functionDetails.getSink().getTopic())) {
+            functionConfig.setOutput(functionDetails.getSink().getTopic());
+        }
+        if (!isEmpty(functionDetails.getSink().getSerDeClassName())) {
+            functionConfig.setOutputSerdeClassName(functionDetails.getSink().getSerDeClassName());
+        }
+        if (!isEmpty(functionDetails.getSink().getSchemaType())) {
+            functionConfig.setOutputSchemaType(functionDetails.getSink().getSchemaType());
+        }
+        if (!isEmpty(functionDetails.getLogTopic())) {
+            functionConfig.setLogTopic(functionDetails.getLogTopic());
+        }
+        functionConfig.setRuntime(Utils.convertRuntime(functionDetails.getRuntime()));
+        functionConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        if (functionDetails.hasRetryDetails()) {
+            functionConfig.setMaxMessageRetries(functionDetails.getRetryDetails().getMaxMessageRetries());
+            if (!isEmpty(functionDetails.getRetryDetails().getDeadLetterTopic())) {
+                functionConfig.setDeadLetterTopic(functionDetails.getRetryDetails().getDeadLetterTopic());
+            }
+        }
+        Map<String, Object> userConfig;
+        if (!isEmpty(functionDetails.getUserConfig())) {
+            Type type = new TypeToken<Map<String, Object>>() {}.getType();
+            userConfig = new Gson().fromJson(functionDetails.getUserConfig(), type);
+        } else {
+            userConfig = new HashMap<>();
+        }
+        if (userConfig.containsKey(WindowConfig.WINDOW_CONFIG_KEY)) {
+            WindowConfig windowConfig = (WindowConfig) userConfig.get(WindowConfig.WINDOW_CONFIG_KEY);
+            userConfig.remove(WindowConfig.WINDOW_CONFIG_KEY);
+            functionConfig.setClassName(windowConfig.getActualWindowFunctionClassName());
+            functionConfig.setWindowConfig(windowConfig);
+        } else {
+            functionConfig.setClassName(functionDetails.getClassName());
+        }
+        functionConfig.setUserConfig(userConfig);
+
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+        }
+
+        return functionConfig;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 95803abb08..545d34419d 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -29,9 +30,13 @@
 
 import java.io.File;
 import java.io.IOException;
+import java.lang.reflect.Type;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 
@@ -175,4 +180,56 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
         }
         return functionDetailsBuilder.build();
     }
+
+    public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant(functionDetails.getTenant());
+        sinkConfig.setNamespace(functionDetails.getNamespace());
+        sinkConfig.setName(functionDetails.getName());
+        sinkConfig.setParallelism(functionDetails.getParallelism());
+        sinkConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Map<String, ConsumerConfig> consumerConfigMap = new HashMap<>();
+        for (Map.Entry<String, Function.ConsumerSpec> input : functionDetails.getSource().getInputSpecsMap().entrySet()) {
+            ConsumerConfig consumerConfig = new ConsumerConfig();
+            if (!isEmpty(input.getValue().getSerdeClassName())) {
+                consumerConfig.setSerdeClassName(input.getValue().getSerdeClassName());
+            }
+            if (!isEmpty(input.getValue().getSchemaType())) {
+                consumerConfig.setSchemaType(input.getValue().getSchemaType());
+            }
+            consumerConfig.setRegexPattern(input.getValue().getIsRegexPattern());
+            consumerConfigMap.put(input.getKey(), consumerConfig);
+        }
+        sinkConfig.setInputSpecs(consumerConfigMap);
+        if (!isEmpty(functionDetails.getSource().getSubscriptionName())) {
+            sinkConfig.setSourceSubscriptionName(functionDetails.getSource().getSubscriptionName());
+        }
+        if (functionDetails.getSource().getSubscriptionType() == Function.SubscriptionType.FAILOVER) {
+            sinkConfig.setRetainOrdering(true);
+            sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
+        } else {
+            sinkConfig.setRetainOrdering(false);
+            sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        }
+        sinkConfig.setAutoAck(functionDetails.getAutoAck());
+        sinkConfig.setTimeoutMs(functionDetails.getSource().getTimeoutMs());
+        if (!isEmpty(functionDetails.getSink().getClassName())) {
+            sinkConfig.setClassName(functionDetails.getSink().getClassName());
+        }
+        if (!isEmpty(functionDetails.getSink().getBuiltin())) {
+            sinkConfig.setArchive("builtin://" + functionDetails.getSink().getBuiltin());
+        }
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(functionDetails.getSink().getConfigs())) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            sinkConfig.setConfigs(new Gson().fromJson(functionDetails.getSink().getConfigs(), type));
+        }
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+        }
+
+        return sinkConfig;
+    }
 }
\ No newline at end of file
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a132c8a9ef..3424062839 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,6 +20,8 @@
 package org.apache.pulsar.functions.utils;
 
 import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
@@ -27,6 +29,8 @@
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
 import java.io.IOException;
+import java.lang.reflect.Type;
+import java.util.Map;
 
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
@@ -41,10 +45,10 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
 
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
-        boolean isBuiltin = sourceConfig.getArchive().startsWith(Utils.BUILTIN);
+        boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.BUILTIN);
 
         if (!isBuiltin) {
-            if (sourceConfig.getArchive().startsWith(Utils.FILE)) {
+            if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(Utils.FILE)) {
                 if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
                     throw new IllegalArgumentException("Class-name must be present for archive with file-url");
                 }
@@ -127,4 +131,40 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
 
         return functionDetailsBuilder.build();
     }
+
+    public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant(functionDetails.getTenant());
+        sourceConfig.setNamespace(functionDetails.getNamespace());
+        sourceConfig.setName(functionDetails.getName());
+        sourceConfig.setParallelism(functionDetails.getParallelism());
+        sourceConfig.setProcessingGuarantees(Utils.convertProcessingGuarantee(functionDetails.getProcessingGuarantees()));
+        Function.SourceSpec sourceSpec = functionDetails.getSource();
+        if (!StringUtils.isEmpty(sourceSpec.getClassName())) {
+            sourceConfig.setClassName(sourceSpec.getClassName());
+        }
+        if (!StringUtils.isEmpty(sourceSpec.getBuiltin())) {
+            sourceConfig.setArchive("builtin://" + sourceSpec.getBuiltin());
+        }
+        if (!StringUtils.isEmpty(sourceSpec.getConfigs())) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            sourceConfig.setConfigs(new Gson().fromJson(sourceSpec.getConfigs(), type));
+        }
+        Function.SinkSpec sinkSpec = functionDetails.getSink();
+        sourceConfig.setTopicName(sinkSpec.getTopic());
+        if (!StringUtils.isEmpty(sinkSpec.getSchemaType())) {
+            sourceConfig.setSchemaType(sinkSpec.getSchemaType());
+        }
+        if (!StringUtils.isEmpty(sinkSpec.getSerDeClassName())) {
+            sourceConfig.setSerdeClassName(sinkSpec.getSerDeClassName());
+        }
+        if (functionDetails.hasResources()) {
+            Resources resources = new Resources();
+            resources.setCpu(functionDetails.getResources().getCpu());
+            resources.setRam(functionDetails.getResources().getRam());
+            resources.setDisk(functionDetails.getResources().getDisk());
+            sourceConfig.setResources(resources);
+        }
+        return sourceConfig;
+    }
 }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index d35be614b1..adeaee162e 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -151,6 +151,15 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         throw new RuntimeException("Unrecognized runtime: " + runtime.name());
     }
 
+    public static FunctionConfig.Runtime convertRuntime(Runtime runtime) {
+        for (FunctionConfig.Runtime type : FunctionConfig.Runtime.values()) {
+            if (type.name().equals(runtime.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized runtime: " + runtime.name());
+    }
+
     public static org.apache.pulsar.functions.proto.Function.ProcessingGuarantees convertProcessingGuarantee(
             FunctionConfig.ProcessingGuarantees processingGuarantees) {
         for (org.apache.pulsar.functions.proto.Function.ProcessingGuarantees type : org.apache.pulsar.functions.proto.Function.ProcessingGuarantees.values()) {
@@ -161,6 +170,17 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
     }
 
+    public static FunctionConfig.ProcessingGuarantees convertProcessingGuarantee(
+            org.apache.pulsar.functions.proto.Function.ProcessingGuarantees processingGuarantees) {
+        for (FunctionConfig.ProcessingGuarantees type : FunctionConfig.ProcessingGuarantees.values()) {
+            if (type.name().equals(processingGuarantees.name())) {
+                return type;
+            }
+        }
+        throw new RuntimeException("Unrecognized processing guarantee: " + processingGuarantees.name());
+    }
+
+
     public static Class<?> getSourceType(String className, ClassLoader classloader) {
 
         Object userClass = Reflections.createInstance(className, classloader);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
new file mode 100644
index 0000000000..1f67798b7c
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class FunctionConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() {
+        FunctionConfig functionConfig = new FunctionConfig();
+        functionConfig.setTenant("test-tenant");
+        functionConfig.setNamespace("test-namespace");
+        functionConfig.setName("test-function");
+        functionConfig.setClassName(IdentityFunction.class.getName());
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        functionConfig.setInputSpecs(inputSpecs);
+        functionConfig.setOutput("test-output");
+        functionConfig.setOutputSerdeClassName("test-serde");
+        functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
+        functionConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        functionConfig.setRetainOrdering(false);
+        functionConfig.setUserConfig(new HashMap<>());
+        functionConfig.setAutoAck(true);
+        functionConfig.setTimeoutMs(2000l);
+        Function.FunctionDetails functionDetails = FunctionConfigUtils.convert(functionConfig, null);
+        FunctionConfig convertedConfig = FunctionConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(functionConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
new file mode 100644
index 0000000000..c5d1ea0cd3
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SinkConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() throws IOException  {
+        SinkConfig sinkConfig = new SinkConfig();
+        sinkConfig.setTenant("test-tenant");
+        sinkConfig.setNamespace("test-namespace");
+        sinkConfig.setName("test-source");
+        sinkConfig.setArchive("builtin://jdbc");
+        sinkConfig.setSourceSubscriptionName("test-subscription");
+        Map<String, ConsumerConfig> inputSpecs = new HashMap<>();
+        inputSpecs.put("test-input", ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
+        sinkConfig.setInputSpecs(inputSpecs);
+        sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sinkConfig.setConfigs(new HashMap<>());
+        sinkConfig.setRetainOrdering(false);
+        sinkConfig.setAutoAck(true);
+        sinkConfig.setTimeoutMs(2000l);
+        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, null);
+        SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(sinkConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
new file mode 100644
index 0000000000..ef4ce61e89
--- /dev/null
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.utils;
+
+import com.google.gson.Gson;
+import org.apache.pulsar.functions.proto.Function;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.util.HashMap;
+
+import static org.testng.Assert.assertEquals;
+
+/**
+ * Unit test of {@link Reflections}.
+ */
+public class SourceConfigUtilsTest {
+
+    @Test
+    public void testConvertBackFidelity() throws IOException  {
+        SourceConfig sourceConfig = new SourceConfig();
+        sourceConfig.setTenant("test-tenant");
+        sourceConfig.setNamespace("test-namespace");
+        sourceConfig.setName("test-source");
+        sourceConfig.setArchive("builtin://jdbc");
+        sourceConfig.setTopicName("test-output");
+        sourceConfig.setSerdeClassName("test-serde");
+        sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        sourceConfig.setConfigs(new HashMap<>());
+        Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, null);
+        SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);
+        assertEquals(
+                new Gson().toJson(sourceConfig),
+                new Gson().toJson(convertedConfig)
+        );
+    }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
index 4faed11a38..920063e106 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionMetaDataManager.java
@@ -141,8 +141,8 @@ public synchronized FunctionMetaData getFunctionMetaData(String tenant, String n
      * @param namespace the namespace
      * @return a list of function names
      */
-    public synchronized Collection<String> listFunctions(String tenant, String namespace) {
-        List<String> ret = new LinkedList<>();
+    public synchronized Collection<FunctionMetaData> listFunctions(String tenant, String namespace) {
+        List<FunctionMetaData> ret = new LinkedList<>();
 
         if (!this.functionMetaDataMap.containsKey(tenant)) {
             return ret;
@@ -152,7 +152,7 @@ public synchronized FunctionMetaData getFunctionMetaData(String tenant, String n
             return ret;
         }
         for (FunctionMetaData functionMetaData : this.functionMetaDataMap.get(tenant).get(namespace).values()) {
-            ret.add(functionMetaData.getFunctionDetails().getName());
+            ret.add(functionMetaData);
         }
         return ret;
     }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0b245cf391..44bb3bd0f4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -84,6 +84,7 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.utils.validation.ConfigValidation;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
@@ -97,9 +98,15 @@
 
 import net.jodah.typetools.TypeResolver;
 
+// TODO:-Currently The source/sink/functions all share this backend. In the future it might make sense
+// to seperate them out in their own implementations as well.
 @Slf4j
 public class FunctionsImpl {
 
+    public static final String FUNCTION = "Function";
+    public static final String SOURCE = "Source";
+    public static final String SINK = "Sink";
+
     private final Supplier<WorkerService> workerServiceSupplier;
 
     public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
@@ -126,11 +133,10 @@ private boolean isWorkerServiceAvailable() {
         return true;
     }
 
-    public Response registerFunction(final String tenant, final String namespace, final String functionName,
+    public Response registerFunction(final String tenant, final String namespace, final String componentName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
-            final String sourceConfigJson, final String sinkConfigJson,
-            final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+            final String componentType, final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -138,23 +144,23 @@ public Response registerFunction(final String tenant, final String namespace, fi
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to register function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function {}/{}/{} already exists", tenant, namespace, functionName);
+        if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s already exists", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s already exists", componentType, componentName))).build();
         }
 
         FunctionDetails functionDetails;
@@ -166,14 +172,14 @@ public Response registerFunction(final String tenant, final String namespace, fi
         // validate parameters
         try {
             if (isPkgUrlProvided) {
-                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+                        functionDetailsJson, componentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, componentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -181,9 +187,9 @@ public Response registerFunction(final String tenant, final String namespace, fi
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
-            log.error("Function {}/{}/{} cannot be admitted by the runtime factory", tenant, namespace, functionName);
+            log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
         }
 
         // function state
@@ -196,7 +202,7 @@ public Response registerFunction(final String tenant, final String namespace, fi
             packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
         } else {
             packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                    : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+                    : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
             if (!isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
             }
@@ -207,10 +213,10 @@ public Response registerFunction(final String tenant, final String namespace, fi
                 : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
-    public Response updateFunction(final String tenant, final String namespace, final String functionName,
+    public Response updateFunction(final String tenant, final String namespace, final String componentName,
             final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
-            final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
-            final String sourceConfigJson, final String sinkConfigJson, final String clientRole) {
+            final String functionPkgUrl, final String functionDetailsJson, final String componentConfigJson,
+            final String componentType, final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -218,22 +224,22 @@ public Response updateFunction(final String tenant, final String namespace, fina
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to update function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
 
         FunctionDetails functionDetails;
@@ -245,14 +251,14 @@ public Response updateFunction(final String tenant, final String namespace, fina
         // validate parameters
         try {
             if (isPkgUrlProvided) {
-                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
-                        functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, componentName, functionPkgUrl,
+                        functionDetailsJson, componentConfigJson, componentType);
             } else {
-                functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
-                        fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
+                functionDetails = validateUpdateRequestParams(tenant, namespace, componentName, uploadedInputStreamAsFile,
+                        fileDetail, functionDetailsJson, componentConfigJson, componentType);
             }
         } catch (Exception e) {
-            log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
@@ -260,9 +266,9 @@ public Response updateFunction(final String tenant, final String namespace, fina
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
-            log.error("Updated Function {}/{}/{} cannot be submitted to runtime factory", tenant, namespace, functionName);
+            log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s cannot be admitted:- %s", functionName, e.getMessage()))).build();
+                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
         }
 
         // function state
@@ -276,7 +282,7 @@ public Response updateFunction(final String tenant, final String namespace, fina
             packageLocationMetaDataBuilder.setPackagePath("builtin://" + getFunctionCodeBuiltin(functionDetails));
         } else {
             packageLocationMetaDataBuilder.setPackagePath(isPkgUrlProvided ? functionPkgUrl
-                    : createPackagePath(tenant, namespace, functionName, fileDetail.getFileName()));
+                    : createPackagePath(tenant, namespace, componentName, fileDetail.getFileName()));
             if (!isPkgUrlProvided) {
                 packageLocationMetaDataBuilder.setOriginalFileName(fileDetail.getFileName());
             }
@@ -287,8 +293,8 @@ public Response updateFunction(final String tenant, final String namespace, fina
                 : updateRequest(functionMetaDataBuilder.build(), uploadedInputStreamAsFile);
     }
 
-    public Response deregisterFunction(final String tenant, final String namespace, final String functionName,
-            String clientRole) {
+    public Response deregisterFunction(final String tenant, final String namespace, final String componentName,
+            String componentType, String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -296,35 +302,41 @@ public Response deregisterFunction(final String tenant, final String namespace,
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
-                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister function", tenant, namespace,
-                        functionName, clientRole);
+                log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
+                        componentName, clientRole, componentType);
                 return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
                         .entity(new ErrorData("client is not authorize to perform operation")).build();
             }
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+            log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
             return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         // validate parameters
         try {
-            validateDeregisterRequestParams(tenant, namespace, functionName);
+            validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid deregister function request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function to deregister does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
         }
 
         CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
-                namespace, functionName);
+                namespace, componentName);
 
         RequestResult requestResult = null;
         try {
@@ -334,12 +346,12 @@ public Response deregisterFunction(final String tenant, final String namespace,
                         .entity(new ErrorData(requestResult.getMessage())).build();
             }
         } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+            log.error("Execution Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
                     e);
             return Response.serverError().type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getCause().getMessage())).build();
         } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering function @ /{}/{}/{}", tenant, namespace, functionName,
+            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
                     e);
             return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON).build();
         }
@@ -347,7 +359,8 @@ public Response deregisterFunction(final String tenant, final String namespace,
         return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+    public Response getFunctionInfo(final String tenant, final String namespace, final String componentName,
+                                    final String componentType)
             throws IOException {
 
         if (!isWorkerServiceAvailable()) {
@@ -356,25 +369,38 @@ public Response getFunctionInfo(final String tenant, final String namespace, fin
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid getFunction request @ /{}/{}/{}", tenant, namespace, functionName, e);
+            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-        if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
-            log.error("Function in getFunction does not exist @ /{}/{}/{}", tenant, namespace, functionName);
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
             return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+        }
+        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
+                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
         }
 
-        FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
-                functionName);
-        String functionDetailsJson = org.apache.pulsar.functions.utils.Utils
-                .printJson(functionMetaData.getFunctionDetails());
-        return Response.status(Status.OK).entity(functionDetailsJson).build();
+        String retval;
+        if (componentType.equals(FUNCTION)) {
+            FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        } else if (componentType.equals(SOURCE)) {
+            SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        } else {
+            SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+            retval = new Gson().toJson(config);
+        }
+        return Response.status(Status.OK).entity(retval).build();
     }
 
     public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
@@ -485,7 +511,7 @@ public Response stopFunctionInstances(final String tenant, final String namespac
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         } catch (IllegalArgumentException e) {
             log.error("Invalid restart-Function request @ /{}/{}/{}", tenant, namespace, functionName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -519,7 +545,7 @@ public Response getFunctionStatus(final String tenant, final String namespace, f
 
         // validate parameters
         try {
-            validateGetFunctionRequestParams(tenant, namespace, functionName);
+            validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionStatus request @ /{}/{}/{}", tenant, namespace, functionName, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
@@ -552,7 +578,7 @@ public Response getFunctionStatus(final String tenant, final String namespace, f
         return Response.status(Status.OK).entity(jsonResponse).build();
     }
 
-    public Response listFunctions(final String tenant, final String namespace) {
+    public Response listFunctions(final String tenant, final String namespace, String componentType) {
 
         if (!isWorkerServiceAvailable()) {
             return getUnavailableResponse();
@@ -562,16 +588,22 @@ public Response listFunctions(final String tenant, final String namespace) {
         try {
             validateListFunctionRequestParams(tenant, namespace);
         } catch (IllegalArgumentException e) {
-            log.error("Invalid listFunctions request @ /{}/{}", tenant, namespace, e);
+            log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
             return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
                     .entity(new ErrorData(e.getMessage())).build();
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
-        Collection<String> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+        Collection<FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
+        List<String> retval = new LinkedList<>();
+        for (FunctionMetaData functionMetaData : functionStateList) {
+            if (calculateSubjectType(functionMetaData).equals(componentType)) {
+                retval.add(functionMetaData.getFunctionDetails().getName());
+            }
+        }
 
-        return Response.status(Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+        return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build();
     }
 
     private Response updateRequest(FunctionMetaData functionMetaData, File uploadedInputStreamAsFile) {
@@ -840,13 +872,13 @@ private void validateListFunctionRequestParams(String tenant, String namespace)
 
     private void validateGetFunctionInstanceRequestParams(String tenant, String namespace, String functionName,
             String instanceId) throws IllegalArgumentException {
-        validateGetFunctionRequestParams(tenant, namespace, functionName);
+        validateGetFunctionRequestParams(tenant, namespace, functionName, FUNCTION);
         if (instanceId == null) {
             throw new IllegalArgumentException("Function Instance Id is not provided");
         }
     }
 
-    private void validateGetFunctionRequestParams(String tenant, String namespace, String functionName)
+    private void validateGetFunctionRequestParams(String tenant, String namespace, String subject, String subjectType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -855,12 +887,12 @@ private void validateGetFunctionRequestParams(String tenant, String namespace, S
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (subject == null) {
+            throw new IllegalArgumentException(subjectType + " Name is not provided");
         }
     }
 
-    private void validateDeregisterRequestParams(String tenant, String namespace, String functionName)
+    private void validateDeregisterRequestParams(String tenant, String namespace, String subject, String subjectType)
             throws IllegalArgumentException {
 
         if (tenant == null) {
@@ -869,30 +901,30 @@ private void validateDeregisterRequestParams(String tenant, String namespace, St
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (subject == null) {
+            throw new IllegalArgumentException(subjectType + " Name is not provided");
         }
     }
 
-    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
-            String functionPkgUrl, String functionDetailsJson, String functionConfigJson,
-            String sourceConfigJson, String sinkConfigJson)
+    private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String componentName,
+            String functionPkgUrl, String functionDetailsJson, String componentConfigJson,
+            String componentType)
             throws IllegalArgumentException, IOException, URISyntaxException {
         if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
             throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
         }
-        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null);
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+                functionDetailsJson, componentConfigJson, componentType, functionPkgUrl, null);
         return functionDetails;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
+    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
             File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
-            String functionConfigJson, String sourceConfigJson, String sinkConfigJson)
+            String componentConfigJson, String componentType)
             throws IllegalArgumentException, IOException, URISyntaxException {
 
-        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
-                functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile);
+        FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
+                functionDetailsJson, componentConfigJson, componentType,null, uploadedInputStreamAsFile);
         if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
             throw new IllegalArgumentException("Function Package is not provided");
         }
@@ -964,40 +996,21 @@ private String getFunctionCodeBuiltin(FunctionDetails functionDetails) {
         return null;
     }
 
-    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
-            String functionDetailsJson, String functionConfigJson, String sourceConfigJson,
-            String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+    private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
+            String functionDetailsJson, String componentConfigJson, String componentType,
+            String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
         if (tenant == null) {
             throw new IllegalArgumentException("Tenant is not provided");
         }
         if (namespace == null) {
             throw new IllegalArgumentException("Namespace is not provided");
         }
-        if (functionName == null) {
-            throw new IllegalArgumentException("Function Name is not provided");
+        if (componentName == null) {
+            throw new IllegalArgumentException(String.format("%s Name is not provided", componentType));
         }
 
-        int numDefinitions = 0;
-        if (!StringUtils.isEmpty(functionDetailsJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(functionConfigJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(sourceConfigJson)) {
-            numDefinitions++;
-        }
-        if (!StringUtils.isEmpty(sinkConfigJson)) {
-            numDefinitions++;
-        }
-        if (numDefinitions == 0) {
-            throw new IllegalArgumentException("Function Info is not provided");
-        }
-        if (numDefinitions > 1) {
-            throw new IllegalArgumentException("Conflicting Info provided");
-        }
-        if (!StringUtils.isEmpty(functionConfigJson)) {
-            FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
+        if (componentType.equals(FUNCTION) && !isEmpty(componentConfigJson)) {
+            FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
             ClassLoader clsLoader = null;
             if (functionConfig.getRuntime() == FunctionConfig.Runtime.JAVA) {
                 clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
@@ -1008,14 +1021,14 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
             ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
             return FunctionConfigUtils.convert(functionConfig, clsLoader);
         }
-        if (!StringUtils.isEmpty(sourceConfigJson)) {
-            SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
+        if (componentType.equals(SOURCE)) {
+            SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
             NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
             ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SourceConfigUtils.convert(sourceConfig, clsLoader);
         }
-        if (!StringUtils.isEmpty(sinkConfigJson)) {
-            SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
+        if (componentType.equals(SINK)) {
+            SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
             NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
             ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SinkConfigUtils.convert(sinkConfig, clsLoader);
@@ -1260,4 +1273,23 @@ public boolean isSuperUser(String clientRole) {
         return clientRole != null && worker().getWorkerConfig().getSuperUserRoles().contains(clientRole);
     }
 
+    public String calculateSubjectType(FunctionMetaData functionMetaData) {
+        SourceSpec sourceSpec = functionMetaData.getFunctionDetails().getSource();
+        SinkSpec sinkSpec = functionMetaData.getFunctionDetails().getSink();
+        if (sourceSpec.getInputSpecsCount() == 0) {
+            return SOURCE;
+        }
+        // Now its between sink and function
+
+        if (!isEmpty(sinkSpec.getBuiltin())) {
+            // if its built in, its a sink
+            return SINK;
+        }
+
+        if (isEmpty(sinkSpec.getClassName()) || sinkSpec.getClassName().equals(PulsarSink.class.getName())) {
+            return FUNCTION;
+        }
+        return SINK;
+    }
+
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index d6e1439cbf..405f88f84c 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -20,6 +20,7 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 import org.apache.pulsar.common.io.ConnectorDefinition;
@@ -60,7 +61,7 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
                                      final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("functionConfig") String functionConfigJson) {
 
         return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
+                functionPkgUrl, functionDetailsJson, functionConfigJson, FunctionsImpl.FUNCTION, clientAppId());
 
     }
 
@@ -86,7 +87,7 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}/{functionName}")
     public Response deregisterFunction(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, functionName, FunctionsImpl.FUNCTION, clientAppId());
     }
 
     @GET
@@ -96,7 +97,7 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
                                     final @PathParam("functionName") String functionName)
             throws IOException {
         return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+            tenant, namespace, functionName, FunctionsImpl.FUNCTION);
     }
 
     @GET
@@ -123,7 +124,7 @@ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
     public Response listFunctions(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace) {
         return functions.listFunctions(
-            tenant, namespace);
+            tenant, namespace, FunctionsImpl.FUNCTION);
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 151c2c1ccf..488f47d335 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -25,6 +25,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -52,7 +53,7 @@ public Response registerSink(final @PathParam("tenant") String tenant,
                                  final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -68,7 +69,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
                                final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, sinkConfigJson, FunctionsImpl.SINK, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public Response updateSink(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}/{sinkName}")
     public Response deregisterSink(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sinkName, FunctionsImpl.SINK, clientAppId());
     }
 
     @GET
@@ -86,7 +87,7 @@ public Response getSinkInfo(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace,
                                 final @PathParam("sinkName") String sinkName)
             throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sinkName);
+        return functions.getFunctionInfo(tenant, namespace, sinkName, FunctionsImpl.SINK);
     }
 
     @GET
@@ -111,7 +112,7 @@ public Response getSinkStatus(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}")
     public Response listSink(final @PathParam("tenant") String tenant,
                              final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SINK);
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 44fac19d70..3b1222ec66 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -25,6 +25,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
@@ -52,7 +53,7 @@ public Response registerSource(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -68,7 +69,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
                                  final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, sourceConfigJson, FunctionsImpl.SOURCE, clientAppId());
 
     }
 
@@ -77,7 +78,7 @@ public Response updateSource(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}/{sourceName}")
     public Response deregisterSource(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        return functions.deregisterFunction(tenant, namespace, sourceName, FunctionsImpl.SOURCE, clientAppId());
     }
 
     @GET
@@ -86,7 +87,7 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant,
                                   final @PathParam("namespace") String namespace,
                                   final @PathParam("sourceName") String sourceName)
             throws IOException {
-        return functions.getFunctionInfo(tenant, namespace, sourceName);
+        return functions.getFunctionInfo(tenant, namespace, sourceName, FunctionsImpl.SOURCE);
     }
 
     @GET
@@ -111,7 +112,7 @@ public Response getSourceStatus(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}")
     public Response listSources(final @PathParam("tenant") String tenant,
                                 final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(tenant, namespace);
+        return functions.listFunctions(tenant, namespace, FunctionsImpl.SOURCE);
 
     }
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
index ac99f9a6a4..7fc7c7f8a2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionMetaDataManagerTest.java
@@ -66,15 +66,16 @@ public void testListFunctions() throws PulsarClientException {
                         mockPulsarClient()));
 
         Map<String, Function.FunctionMetaData> functionMetaDataMap1 = new HashMap<>();
-        functionMetaDataMap1.put("func-1", Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-1")).build());
-        functionMetaDataMap1.put("func-2",
-                Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-2")).build());
+        Function.FunctionMetaData f1 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-1")).build();
+        functionMetaDataMap1.put("func-1", f1);
+        Function.FunctionMetaData f2 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-2")).build();
+        functionMetaDataMap1.put("func-2", f2);
+        Function.FunctionMetaData f3 = Function.FunctionMetaData.newBuilder().setFunctionDetails(
+                Function.FunctionDetails.newBuilder().setName("func-3")).build();
         Map<String, Function.FunctionMetaData> functionMetaDataInfoMap2 = new HashMap<>();
-        functionMetaDataInfoMap2.put("func-3",
-                Function.FunctionMetaData.newBuilder().setFunctionDetails(
-                        Function.FunctionDetails.newBuilder().setName("func-3")).build());
+        functionMetaDataInfoMap2.put("func-3", f3);
 
 
         functionMetaDataManager.functionMetaDataMap.put("tenant-1", new HashMap<>());
@@ -86,13 +87,13 @@ public void testListFunctions() throws PulsarClientException {
         Assert.assertEquals(2, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-1").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-1").contains("func-1"));
+                "tenant-1", "namespace-1").contains(f1));
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-1").contains("func-2"));
+                "tenant-1", "namespace-1").contains(f2));
         Assert.assertEquals(1, functionMetaDataManager.listFunctions(
                 "tenant-1", "namespace-2").size());
         Assert.assertTrue(functionMetaDataManager.listFunctions(
-                "tenant-1", "namespace-2").contains("func-3"));
+                "tenant-1", "namespace-2").contains(f3));
     }
 
     @Test
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index d514faba69..460d0509ff 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -21,10 +21,9 @@
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
 import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
@@ -62,6 +61,7 @@
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.functions.utils.FunctionConfig;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.worker.*;
 import org.apache.pulsar.functions.worker.request.RequestResult;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
@@ -145,6 +145,7 @@ public void setup() {
         when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
 
         this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
+        doReturn("Function").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -311,8 +312,7 @@ private void testRegisterFunctionMissingArguments(
                 null,
                 null,
                 new Gson().toJson(functionConfig),
-                null,
-                null,
+                FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -339,8 +339,7 @@ private Response registerDefaultFunction() {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
     }
 
@@ -600,8 +599,7 @@ private void testUpdateFunctionMissingArguments(
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -629,8 +627,7 @@ private Response updateDefaultFunction() throws IOException {
             null,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
     }
 
@@ -714,8 +711,7 @@ public void testUpdateFunctionWithUrl() throws IOException {
             filePackageUrl,
             null,
             new Gson().toJson(functionConfig),
-            null,
-                null,
+            FunctionsImpl.FUNCTION,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -804,7 +800,8 @@ private void testDeregisterFunctionMissingArguments(
             tenant,
             namespace,
             function,
-            null);
+            FunctionsImpl.FUNCTION,
+                null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -815,7 +812,8 @@ private Response deregisterDefaultFunction() {
             tenant,
             namespace,
             function,
-            null);
+            FunctionsImpl.FUNCTION,
+                 null);
     }
 
     @Test
@@ -910,7 +908,8 @@ private void testGetFunctionMissingArguments(
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            function,
+                FunctionsImpl.FUNCTION);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -920,7 +919,8 @@ private Response getDefaultFunctionInfo() throws IOException {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            function,
+                FunctionsImpl.FUNCTION);
     }
 
     @Test
@@ -960,8 +960,8 @@ public void testGetFunctionSuccess() throws Exception {
         Response response = getDefaultFunctionInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
-            response.getEntity());
+                new Gson().toJson(FunctionConfigUtils.convertFromDetails(functionDetails)),
+                response.getEntity());
     }
 
     //
@@ -991,7 +991,8 @@ private void testListFunctionsMissingArguments(
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.FUNCTION);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
@@ -1000,13 +1001,46 @@ private void testListFunctionsMissingArguments(
     private Response listDefaultFunctions() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.FUNCTION);
     }
 
     @Test
     public void testListFunctionsSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> metaDataList = new LinkedList<>();
+        FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build();
+        FunctionMetaData functionMetaData2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build();
+        metaDataList.add(functionMetaData1);
+        metaDataList.add(functionMetaData2);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
+
+        Response response = listDefaultFunctions();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
+
+    @Test
+    public void testOnlyGetSources() throws Exception {
+        List<String> functions = Lists.newArrayList("test-2");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
 
         Response response = listDefaultFunctions();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -1068,7 +1102,7 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
         Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
-                null, new Gson().toJson(functionConfig), null, null, null);
+                null, new Gson().toJson(functionConfig), FunctionsImpl.FUNCTION, null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index 4e52c95442..315f56d2f0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,8 @@
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
@@ -38,6 +41,7 @@
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
@@ -51,6 +55,8 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -143,6 +149,7 @@ public void setup() throws Exception {
         doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SinkConfigUtils.class);
         when(SinkConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        Mockito.doReturn("Sink").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -188,7 +195,7 @@ public void testRegisterSinkMissingFunctionName() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Sink Name is not provided");
     }
 
     @Test
@@ -257,9 +264,8 @@ private void testRegisterSinkMissingArguments(
                 details,
                 null,
                 null,
-                null,
-                null,
                 new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +288,8 @@ private Response registerDefaultSink() {
             mockedFormData,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
     }
 
@@ -296,7 +301,7 @@ public void testRegisterExistedSink() throws IOException {
 
         Response response = registerDefaultSink();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -421,7 +426,7 @@ public void testUpdateSinkMissingFunctionName() throws IOException {
             topicsToSerDeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Sink Name is not provided");
     }
 
     @Test
@@ -492,9 +497,8 @@ private void testUpdateSinkMissingArguments(
             details,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -518,9 +522,8 @@ private Response updateDefaultSink() throws IOException {
             mockedFormData,
             null,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
     }
 
@@ -530,7 +533,7 @@ public void testUpdateNotExistedSink() throws IOException {
 
         Response response = updateDefaultSink();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -600,9 +603,8 @@ public void testUpdateSinkWithUrl() throws IOException {
             null,
             filePackageUrl,
             null,
-            null,
-            null,
             new Gson().toJson(sinkConfig),
+                FunctionsImpl.SINK,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -678,7 +680,7 @@ public void testDeregisterSinkMissingFunctionName() throws Exception {
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Sink Name");
     }
 
     private void testDeregisterSinkMissingArguments(
@@ -691,6 +693,7 @@ private void testDeregisterSinkMissingArguments(
             tenant,
             namespace,
             sink,
+            FunctionsImpl.SINK,
             null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -702,6 +705,7 @@ private Response deregisterDefaultSink() {
             tenant,
             namespace,
                 sink,
+            FunctionsImpl.SINK,
             null);
     }
 
@@ -711,7 +715,7 @@ public void testDeregisterNotExistedSink() {
 
         Response response = deregisterDefaultSink();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -757,109 +761,116 @@ public void testDeregisterSinkInterrupted() throws Exception {
         assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    // Source Info doesn't exist. Maybe one day they might be added
     //
-    // Get Function Info
+    // Get Sink Info
     //
 
-    /*
     @Test
-    public void testGetFunctionMissingTenant() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingTenant() throws Exception {
+        testGetSinkMissingArguments(
             null,
             namespace,
-                source,
+                sink,
             "Tenant");
     }
 
     @Test
-    public void testGetFunctionMissingNamespace() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingNamespace() throws Exception {
+        testGetSinkMissingArguments(
             tenant,
             null,
-                source,
+                sink,
             "Namespace");
     }
 
     @Test
-    public void testGetFunctionMissingFunctionName() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSinkMissingFunctionName() throws Exception {
+        testGetSinkMissingArguments(
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Sink Name");
     }
 
-    private void testGetFunctionMissingArguments(
+    private void testGetSinkMissingArguments(
         String tenant,
         String namespace,
-        String function,
+        String sink,
         String missingFieldName
     ) throws IOException {
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            sink,
+                FunctionsImpl.SINK);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultFunctionInfo() throws IOException {
+    private Response getDefaultSinkInfo() throws IOException {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-                source);
+                sink,
+                FunctionsImpl.SINK);
     }
 
     @Test
-    public void testGetNotExistedFunction() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+    public void testGetNotExistedSink() throws IOException {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSinkInfo();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
-    public void testGetFunctionSuccess() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+    public void testGetSinkSuccess() throws Exception {
+        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        SinkSpec sinkSpec = SinkSpec.newBuilder()
-                .setTopic(outputTopic)
-                .setSerDeClassName(outputSerdeClassName).build();
+        Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
+                .setSubscriptionType(Function.SubscriptionType.SHARED)
+                .setSubscriptionName(subscriptionName)
+                .putInputSpecs("input", Function.ConsumerSpec.newBuilder()
+                .setSerdeClassName(TopicSchema.DEFAULT_SERDE)
+                .setIsRegexPattern(false)
+                .build()).build();
+        Function.SinkSpec sinkSpec = Function.SinkSpec.newBuilder()
+                .setBuiltin("jdbc")
+                .build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setClassName(className)
+                .setClassName(IdentityFunction.class.getName())
                 .setSink(sinkSpec)
-                .setName(source)
+                .setName(sink)
                 .setNamespace(namespace)
-                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setProcessingGuarantees(Function.ProcessingGuarantees.ATLEAST_ONCE)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+                .setRuntime(FunctionDetails.Runtime.JAVA)
+                .setSource(sourceSpec).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
             .setCreateTime(System.currentTimeMillis())
             .setFunctionDetails(functionDetails)
-            .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+            .setPackageLocation(Function.PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
             .setVersion(1234)
             .build();
-        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
+        when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSinkInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            new Gson().toJson(SinkConfigUtils.convertFromDetails(functionDetails)),
             response.getEntity());
     }
 
     //
-    // List Functions
+    // List Sinks
     //
 
     @Test
-    public void testListFunctionsMissingTenant() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSinksMissingTenant() throws Exception {
+        testListSinksMissingArguments(
             null,
             namespace,
             "Tenant");
@@ -867,39 +878,70 @@ public void testListFunctionsMissingTenant() throws Exception {
 
     @Test
     public void testListFunctionsMissingNamespace() throws Exception {
-        testListFunctionsMissingArguments(
+        testListSinksMissingArguments(
             tenant,
             null,
             "Namespace");
     }
 
-    private void testListFunctionsMissingArguments(
+    private void testListSinksMissingArguments(
         String tenant,
         String namespace,
         String missingFieldName
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SINK);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultFunctions() {
+    private Response listDefaultSinks() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SINK);
     }
 
     @Test
-    public void testListFunctionsSuccess() throws Exception {
+    public void testListSinksSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build());
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build());
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+        Response response = listDefaultSinks();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
 
-        Response response = listDefaultFunctions();
+    @Test
+    public void testOnlyGetSinks() throws Exception {
+        List<String> functions = Lists.newArrayList("test-3");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+        Response response = listDefaultSinks();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    */
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index 1ca869ea8b..eee684f4d6 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+import com.google.common.collect.Lists;
 import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
@@ -26,6 +27,7 @@
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Record;
+import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function.*;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
@@ -37,6 +39,7 @@
 import org.apache.pulsar.io.core.Source;
 import org.apache.pulsar.io.core.SourceContext;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.testng.Assert;
@@ -48,6 +51,8 @@
 import javax.ws.rs.core.Response;
 import javax.ws.rs.core.Response.Status;
 import java.io.*;
+import java.util.LinkedList;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
@@ -133,6 +138,7 @@ public void setup() throws Exception {
         doReturn(null).when(resource).extractNarClassLoader(anyString(), anyString(), anyObject(), anyBoolean());
         mockStatic(SourceConfigUtils.class);
         when(SourceConfigUtils.convert(anyObject(), anyObject())).thenReturn(FunctionDetails.newBuilder().build());
+        Mockito.doReturn("Source").when(this.resource).calculateSubjectType(any());
     }
 
     //
@@ -181,7 +187,7 @@ public void testRegisterSourceMissingFunctionName() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Source Name is not provided");
     }
 
     @Test
@@ -256,9 +262,8 @@ private void testRegisterSourceMissingArguments(
                 details,
                 null,
                 null,
-                null,
                 new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -282,9 +287,8 @@ private Response registerDefaultSource() {
             mockedFormData,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
     }
 
@@ -296,7 +300,7 @@ public void testRegisterExistedSource() throws IOException {
 
         Response response = registerDefaultSource();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -424,7 +428,7 @@ public void testUpdateSourceMissingFunctionName() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                "Function Name is not provided");
+                "Source Name is not provided");
     }
 
     @Test
@@ -501,9 +505,8 @@ private void testUpdateSourceMissingArguments(
             details,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -528,9 +531,8 @@ private Response updateDefaultSource() throws IOException {
             mockedFormData,
             null,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
     }
 
@@ -540,7 +542,7 @@ public void testUpdateNotExistedSource() throws IOException {
 
         Response response = updateDefaultSource();
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -611,9 +613,8 @@ public void testUpdateSourceWithUrl() throws IOException {
             null,
             filePackageUrl,
             null,
-            null,
             new Gson().toJson(sourceConfig),
-                null,
+                FunctionsImpl.SOURCE,
                 null);
 
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
@@ -689,7 +690,7 @@ public void testDeregisterSourceMissingFunctionName() throws Exception {
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Source Name");
     }
 
     private void testDeregisterSourceMissingArguments(
@@ -702,6 +703,7 @@ private void testDeregisterSourceMissingArguments(
             tenant,
             namespace,
             function,
+            FunctionsImpl.SOURCE,
             null);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -713,6 +715,7 @@ private Response deregisterDefaultSource() {
             tenant,
             namespace,
                 source,
+            FunctionsImpl.SOURCE,
             null);
     }
 
@@ -722,7 +725,7 @@ public void testDeregisterNotExistedSource() {
 
         Response response = deregisterDefaultSource();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
@@ -768,15 +771,13 @@ public void testDeregisterSourceInterrupted() throws Exception {
         assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    // Source Info doesn't exist. Maybe one day they might be added
     //
-    // Get Function Info
+    // Get Source Info
     //
 
-    /*
     @Test
-    public void testGetFunctionMissingTenant() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingTenant() throws Exception {
+        testGetSourceMissingArguments(
             null,
             namespace,
                 source,
@@ -784,8 +785,8 @@ public void testGetFunctionMissingTenant() throws Exception {
     }
 
     @Test
-    public void testGetFunctionMissingNamespace() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingNamespace() throws Exception {
+        testGetSourceMissingArguments(
             tenant,
             null,
                 source,
@@ -793,62 +794,66 @@ public void testGetFunctionMissingNamespace() throws Exception {
     }
 
     @Test
-    public void testGetFunctionMissingFunctionName() throws Exception {
-        testGetFunctionMissingArguments(
+    public void testGetSourceMissingFunctionName() throws Exception {
+        testGetSourceMissingArguments(
             tenant,
             namespace,
             null,
-            "Function Name");
+            "Source Name");
     }
 
-    private void testGetFunctionMissingArguments(
+    private void testGetSourceMissingArguments(
         String tenant,
         String namespace,
-        String function,
+        String source,
         String missingFieldName
     ) throws IOException {
         Response response = resource.getFunctionInfo(
             tenant,
             namespace,
-            function);
+            source,
+                FunctionsImpl.SOURCE);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultFunctionInfo() throws IOException {
+    private Response getDefaultSourceInfo() throws IOException {
         return resource.getFunctionInfo(
             tenant,
             namespace,
-                source);
+                source,
+                FunctionsImpl.SOURCE);
     }
 
     @Test
-    public void testGetNotExistedFunction() throws IOException {
+    public void testGetNotExistedSource() throws IOException {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSourceInfo();
         assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
     }
 
     @Test
-    public void testGetFunctionSuccess() throws Exception {
+    public void testGetSourceSuccess() throws Exception {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
+        SourceSpec sourceSpec = SourceSpec.newBuilder().setBuiltin("jdbc").build();
         SinkSpec sinkSpec = SinkSpec.newBuilder()
                 .setTopic(outputTopic)
                 .setSerDeClassName(outputSerdeClassName).build();
         FunctionDetails functionDetails = FunctionDetails.newBuilder()
-                .setClassName(className)
+                .setClassName(IdentityFunction.class.getName())
                 .setSink(sinkSpec)
                 .setName(source)
                 .setNamespace(namespace)
-                .setProcessingGuarantees(ProcessingGuarantees.ATMOST_ONCE)
+                .setProcessingGuarantees(ProcessingGuarantees.ATLEAST_ONCE)
+                .setRuntime(FunctionDetails.Runtime.JAVA)
+                .setAutoAck(true)
                 .setTenant(tenant)
                 .setParallelism(parallelism)
-                .setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
-                        .putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
+                .setSource(sourceSpec).build();
         FunctionMetaData metaData = FunctionMetaData.newBuilder()
             .setCreateTime(System.currentTimeMillis())
             .setFunctionDetails(functionDetails)
@@ -857,60 +862,90 @@ public void testGetFunctionSuccess() throws Exception {
             .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
 
-        Response response = getDefaultFunctionInfo();
+        Response response = getDefaultSourceInfo();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(
-            org.apache.pulsar.functions.utils.Utils.printJson(functionDetails),
+            new Gson().toJson(SourceConfigUtils.convertFromDetails(functionDetails)),
             response.getEntity());
     }
 
     //
-    // List Functions
+    // List Sources
     //
 
     @Test
-    public void testListFunctionsMissingTenant() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSourcesMissingTenant() throws Exception {
+        testListSourcesMissingArguments(
             null,
             namespace,
             "Tenant");
     }
 
     @Test
-    public void testListFunctionsMissingNamespace() throws Exception {
-        testListFunctionsMissingArguments(
+    public void testListSourcesMissingNamespace() throws Exception {
+        testListSourcesMissingArguments(
             tenant,
             null,
             "Namespace");
     }
 
-    private void testListFunctionsMissingArguments(
+    private void testListSourcesMissingArguments(
         String tenant,
         String namespace,
         String missingFieldName
     ) {
         Response response = resource.listFunctions(
             tenant,
-            namespace);
+            namespace,
+                FunctionsImpl.SOURCE);
 
         assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
         assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultFunctions() {
+    private Response listDefaultSources() {
         return resource.listFunctions(
             tenant,
-            namespace);
+            namespace,FunctionsImpl.SOURCE);
     }
 
     @Test
-    public void testListFunctionsSuccess() throws Exception {
+    public void testListSourcesSuccess() throws Exception {
         List<String> functions = Lists.newArrayList("test-1", "test-2");
-        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functions);
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()
+        ).build());
+        functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()
+        ).build());
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+
+        Response response = listDefaultSources();
+        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        assertEquals(new Gson().toJson(functions), response.getEntity());
+    }
 
-        Response response = listDefaultFunctions();
+    @Test
+    public void testOnlyGetSources() throws Exception {
+        List<String> functions = Lists.newArrayList("test-1");
+        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+        FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-1").build()).build();
+        functionMetaDataList.add(f1);
+        FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-2").build()).build();
+        functionMetaDataList.add(f2);
+        FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
+                FunctionDetails.newBuilder().setName("test-3").build()).build();
+        functionMetaDataList.add(f3);
+        when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
+        doReturn("Source").when(this.resource).calculateSubjectType(f1);
+        doReturn("Function").when(this.resource).calculateSubjectType(f2);
+        doReturn("Sink").when(this.resource).calculateSubjectType(f3);
+
+        Response response = listDefaultSources();
         assertEquals(Status.OK.getStatusCode(), response.getStatus());
         assertEquals(new Gson().toJson(functions), response.getEntity());
     }
-    */
 }
diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index b8452f1d97..6659fcf1fd 100644
--- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -214,7 +214,7 @@ protected void getSinkInfoSuccess(SinkTester tester,
                                       boolean builtin) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -224,7 +224,7 @@ protected void getSinkInfoSuccess(SinkTester tester,
         log.info("Get sink info : {}", result.getStdout());
         if (builtin) {
             assertTrue(
-                    result.getStdout().contains("\"builtin\": \"" + tester.getSinkType().name().toLowerCase() + "\""),
+                    result.getStdout().contains("\"archive\": \"builtin://" + tester.getSinkType().name().toLowerCase() + "\""),
                     result.getStdout()
             );
         } else {
@@ -366,7 +366,7 @@ protected void deleteSink(String tenant, String namespace, String sinkName) thro
     protected void getSinkInfoNotFound(String tenant, String namespace, String sinkName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "sink",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -376,7 +376,7 @@ protected void getSinkInfoNotFound(String tenant, String namespace, String sinkN
             pulsarCluster.getAnyWorker().execCmd(commands);
             fail("Command should have exited with non-zero");
         } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " + sinkName + " doesn't exist"));
+            assertTrue(e.getResult().getStderr().contains("Reason: Sink " + sinkName + " doesn't exist"));
         }
     }
 
@@ -465,7 +465,7 @@ protected void getSourceInfoSuccess(SourceTester tester,
                                         String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -474,7 +474,7 @@ protected void getSourceInfoSuccess(SourceTester tester,
         ContainerExecResult result = pulsarCluster.getAnyWorker().execCmd(commands);
         log.info("Get source info : {}", result.getStdout());
         assertTrue(
-            result.getStdout().contains("\"builtin\": \"" + tester.getSourceType() + "\""),
+            result.getStdout().contains("\"archive\": \"builtin://" + tester.getSourceType() + "\""),
             result.getStdout()
         );
     }
@@ -564,7 +564,7 @@ protected void deleteSource(String tenant, String namespace, String sourceName)
     protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
         String[] commands = {
             PulsarCluster.ADMIN_SCRIPT,
-            "functions",
+            "source",
             "get",
             "--tenant", tenant,
             "--namespace", namespace,
@@ -574,7 +574,7 @@ protected void getSourceInfoNotFound(String tenant, String namespace, String sou
             pulsarCluster.getAnyWorker().execCmd(commands);
             fail("Command should have exited with non-zero");
         } catch (ContainerExecException e) {
-            assertTrue(e.getResult().getStderr().contains("Reason: Function " + sourceName + " doesn't exist"));
+            assertTrue(e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
         }
     }
 


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services