You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/21 02:20:20 UTC

[GitHub] jerrypeng closed pull request #3168: Refactor api commands

jerrypeng closed pull request #3168: Refactor api commands
URL: https://github.com/apache/pulsar/pull/3168
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 7602ba2730..3d81f836cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -18,25 +18,13 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
-import java.util.function.Supplier;
-
-import javax.ws.rs.Consumes;
-import javax.ws.rs.DELETE;
-import javax.ws.rs.GET;
-import javax.ws.rs.POST;
-import javax.ws.rs.PUT;
-import javax.ws.rs.Path;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
-import javax.ws.rs.QueryParam;
-import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
@@ -46,9 +34,21 @@
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Supplier;
 
 public class FunctionsBase extends AdminResource implements Supplier<WorkerService> {
 
@@ -73,17 +73,17 @@ public WorkerService get() {
     })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerFunction(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("functionName") String functionName,
-                                     final @FormDataParam("data") InputStream uploadedInputStream,
-                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                     final @FormDataParam("url") String functionPkgUrl,
-                                     final @FormDataParam("functionDetails") String functionDetailsJson,
-                                     final @FormDataParam("functionConfig") String functionConfigJson) {
-
-        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
-                functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+    public void registerFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("functionName") String functionName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("functionDetails") String functionDetailsJson,
+                                 final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+            functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
     }
 
     @PUT
@@ -95,18 +95,17 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
     })
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateFunction(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("functionName") String functionName,
-                                   final @FormDataParam("data") InputStream uploadedInputStream,
-                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                   final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("functionDetails") String functionDetailsJson,
-                                   final @FormDataParam("functionConfig") String functionConfigJson) {
-
-        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+    public void updateFunction(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("functionName") String functionName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("functionDetails") String functionDetailsJson,
+                               final @FormDataParam("functionConfig") String functionConfigJson) {
+
+        functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
     }
 
 
@@ -120,10 +119,10 @@ public Response updateFunction(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 200, message = "The function was successfully deleted")
     })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public Response deregisterFunction(final @PathParam("tenant") String tenant,
-                                       final @PathParam("namespace") String namespace,
-                                       final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    public void deregisterFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName) {
+        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
@@ -138,11 +137,10 @@ public Response deregisterFunction(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 404, message = "The function doesn't exist")
     })
     @Path("/{tenant}/{namespace}/{functionName}")
-    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace,
-                                    final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+    public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName) throws IOException {
+         return functions.getFunctionInfo(tenant, namespace, functionName);
     }
 
     @GET
@@ -159,14 +157,10 @@ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
     public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
             final @PathParam("tenant") String tenant,
-
             final @PathParam("namespace") String namespace,
-
             final @PathParam("functionName") String functionName,
-
             final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionInstanceStatus(
-            tenant, namespace, functionName, instanceId, uri.getRequestUri());
+        return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -185,8 +179,7 @@ public FunctionStatus getFunctionStatus(
             final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace,
             final @PathParam("functionName") String functionName) throws IOException {
-        return functions.getFunctionStatus(
-            tenant, namespace, functionName, uri.getRequestUri());
+        return functions.getFunctionStatus(tenant, namespace, functionName, uri.getRequestUri());
     }
 
     @GET
@@ -202,8 +195,8 @@ public FunctionStatus getFunctionStatus(
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("functionName") String functionName) throws IOException {
+                                        final @PathParam("namespace") String namespace,
+                                        final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
     }
 
@@ -219,12 +212,12 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant,
-                                             final @PathParam("namespace") String namespace,
-                                             final @PathParam("functionName") String functionName,
-                                             final @PathParam("instanceId") String instanceId) throws IOException {
-        return functions.getFunctionsInstanceStats(
-                tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
+            final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace,
+            final @PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) throws IOException {
+        return functions.getFunctionsInstanceStats(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -238,11 +231,9 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
     @Path("/{tenant}/{namespace}")
-    public Response listFunctions(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace) {
-        return functions.listFunctions(
-            tenant, namespace);
-
+    public List<String> listFunctions(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace) {
+        return functions.listFunctions(tenant, namespace);
     }
 
     @POST
@@ -258,16 +249,13 @@ public Response listFunctions(final @PathParam("tenant") String tenant,
     })
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response triggerFunction(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace,
-                                    final @PathParam("functionName") String functionName,
-                                    final @FormDataParam("data") String triggerValue,
-                                    final @FormDataParam("dataStream") InputStream triggerStream,
-                                    final @FormDataParam("topic") String topic) {
-
-        return functions.triggerFunction(
-                tenant, namespace, functionName, triggerValue, triggerStream, topic);
-
+    public String triggerFunction(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("functionName") String functionName,
+                                  final @FormDataParam("data") String triggerValue,
+                                  final @FormDataParam("dataStream") InputStream triggerStream,
+                                  final @FormDataParam("topic") String topic) {
+        return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
     }
 
     @GET
@@ -282,63 +270,73 @@ public Response triggerFunction(final @PathParam("tenant") String tenant,
         @ApiResponse(code = 500, message = "Internal server error")
     })
     @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
-    public Response getFunctionState(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("functionName") String functionName,
-                                     final @PathParam("key") String key) {
-        return functions.getFunctionState(
-            tenant, namespace, functionName, key);
-
+    public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName,
+                                          final @PathParam("key") String key) {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
     }
 
     @POST
     @ApiOperation(value = "Restart function instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("functionName") String functionName,
+                                    final @PathParam("instanceId") String instanceId) {
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all function instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("functionName") String functionName) {
+        functions.restartFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
     @ApiOperation(value = "Stop function instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName,
+                             final @PathParam("instanceId") String instanceId) {
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all function instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName) {
+        functions.stopFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
@@ -348,9 +346,9 @@ public Response stopFunction(final @PathParam("tenant") String tenant,
     )
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
-                                   final @FormDataParam("path") String path) {
-        return functions.uploadFunction(uploadedInputStream, path);
+    public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("path") String path) {
+        functions.uploadFunction(uploadedInputStream, path);
     }
 
     @GET
@@ -359,7 +357,7 @@ public Response uploadFunction(final @FormDataParam("data") InputStream uploaded
             hidden = true
     )
     @Path("/download")
-    public Response downloadFunction(final @QueryParam("path") String path) {
+    public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
         return functions.downloadFunction(path);
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
index 2036c50de8..41c5376a2f 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -24,7 +24,7 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.worker.WorkerService;
@@ -32,9 +32,15 @@
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -64,15 +70,15 @@ public WorkerService get() {
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerSink(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("sinkName") String sinkName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("sinkConfig") String sinkConfigJson) {
+    public void registerSink(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("sinkName") String sinkName,
+                             final @FormDataParam("data") InputStream uploadedInputStream,
+                             final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
-        return sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+        sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sinkConfigJson, clientAppId());
     }
 
@@ -85,15 +91,15 @@ public Response registerSink(final @PathParam("tenant") String tenant,
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateSink(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("sinkName") String sinkName,
-                               final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                               final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("sinkConfig") String sinkConfigJson) {
+    public void updateSink(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sinkName") String sinkName,
+                           final @FormDataParam("data") InputStream uploadedInputStream,
+                           final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                           final @FormDataParam("url") String functionPkgUrl,
+                           final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
-        return sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+         sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sinkConfigJson, clientAppId());
 
     }
@@ -109,10 +115,10 @@ public Response updateSink(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 200, message = "The function was successfully deleted")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public Response deregisterSink(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("sinkName") String sinkName) {
-        return sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+    public void deregisterSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName) {
+        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
     }
 
     @GET
@@ -127,10 +133,10 @@ public Response deregisterSink(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 404, message = "The function doesn't exist")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public Response getSinkInfo(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("sinkName") String sinkName) throws IOException {
-        return sink.getFunctionInfo(tenant, namespace, sinkName);
+    public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sinkName") String sinkName) throws IOException {
+        return sink.getSinkInfo(tenant, namespace, sinkName);
     }
 
     @GET
@@ -167,8 +173,8 @@ public Response getSinkInfo(final @PathParam("tenant") String tenant,
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sinkName}/status")
     public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("sinkName") String sinkName) throws IOException {
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("sinkName") String sinkName) throws IOException {
         return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
     }
 
@@ -183,60 +189,71 @@ public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
     @Path("/{tenant}/{namespace}")
-    public Response listSinks(final @PathParam("tenant") String tenant,
-                              final @PathParam("namespace") String namespace) {
+    public List<String> listSinks(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace) {
         return sink.listFunctions(tenant, namespace);
-
     }
 
     @POST
     @ApiOperation(value = "Restart sink instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
-            final @PathParam("instanceId") String instanceId) {
-        return sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    public void restartSink(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sinkName") String sinkName,
+                            final @PathParam("instanceId") String instanceId) {
+        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all sink instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{sinkName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return sink.restartFunctionInstances(tenant, namespace, sinkName);
+    public void restartSink(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sinkName") String sinkName) {
+        sink.restartFunctionInstances(tenant, namespace, sinkName);
     }
 
     @POST
     @ApiOperation(value = "Stop sink instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
-            final @PathParam("instanceId") String instanceId) {
-        return sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+    public void stopSink(final @PathParam("tenant") String tenant,
+                         final @PathParam("namespace") String namespace,
+                         final @PathParam("sinkName") String sinkName,
+                         final @PathParam("instanceId") String instanceId) {
+        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all sink instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{sinkName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return sink.stopFunctionInstances(tenant, namespace, sinkName);
+    public void stopSink(final @PathParam("tenant") String tenant,
+                         final @PathParam("namespace") String namespace,
+                         final @PathParam("sinkName") String sinkName) {
+        sink.stopFunctionInstances(tenant, namespace, sinkName);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
index 9f04662521..c4a102b8a2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -24,17 +24,23 @@
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
-import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
 
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
@@ -64,16 +70,16 @@ public WorkerService get() {
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerSource(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("sourceName") String sourceName,
-                                   final @FormDataParam("data") InputStream uploadedInputStream,
-                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                   final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("sourceConfig") String sourceConfigJson) {
+    public void registerSource(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sourceName") String sourceName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
-        return source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sourceConfigJson, clientAppId());
+        source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+            functionPkgUrl, null, sourceConfigJson, clientAppId());
     }
 
     @PUT
@@ -85,17 +91,16 @@ public Response registerSource(final @PathParam("tenant") String tenant,
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateSource(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("sourceName") String sourceName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
-        return source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                functionPkgUrl, null, sourceConfigJson, clientAppId());
+    public void updateSource(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("sourceName") String sourceName,
+                             final @FormDataParam("data") InputStream uploadedInputStream,
+                             final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
+        source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+            functionPkgUrl, null, sourceConfigJson, clientAppId());
     }
 
 
@@ -109,10 +114,10 @@ public Response updateSource(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 200, message = "The function was successfully deleted")
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public Response deregisterSource(final @PathParam("tenant") String tenant,
+    public void deregisterSource(final @PathParam("tenant") String tenant,
                                        final @PathParam("namespace") String namespace,
                                        final @PathParam("sourceName") String sourceName) {
-        return source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+        source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
     }
 
     @GET
@@ -127,11 +132,10 @@ public Response deregisterSource(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 404, message = "The function doesn't exist")
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public Response getSourceInfo(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("sourceName") String sourceName) throws IOException {
-        return source.getFunctionInfo(
-            tenant, namespace, sourceName);
+    public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("sourceName") String sourceName) throws IOException {
+        return source.getSourceInfo(tenant, namespace, sourceName);
     }
 
     @GET
@@ -168,8 +172,8 @@ public Response getSourceInfo(final @PathParam("tenant") String tenant,
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sourceName}/status")
     public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace,
-                                    final @PathParam("sourceName") String sourceName) throws IOException {
+                                        final @PathParam("namespace") String namespace,
+                                        final @PathParam("sourceName") String sourceName) throws IOException {
         return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri());
     }
 
@@ -184,61 +188,67 @@ public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
     })
     @Path("/{tenant}/{namespace}")
-    public Response listSources(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace) {
-        return source.listFunctions(
-            tenant, namespace);
-
+    public List<String> listSources(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace) {
+        return source.listFunctions(tenant, namespace);
     }
 
     @POST
     @ApiOperation(value = "Restart source instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
-            final @PathParam("instanceId") String instanceId) {
-        return source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    public void restartSource(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("sourceName") String sourceName,
+                              final @PathParam("instanceId") String instanceId) {
+        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all source instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return source.restartFunctionInstances(tenant, namespace, sourceName);
+    public void restartSource(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("sourceName") String sourceName) {
+        source.restartFunctionInstances(tenant, namespace, sourceName);
     }
 
     @POST
     @ApiOperation(value = "Stop source instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
-            final @PathParam("instanceId") String instanceId) {
-        return source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+    public void stopSource(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sourceName") String sourceName,
+                           final @PathParam("instanceId") String instanceId) {
+        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all source instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return source.stopFunctionInstances(tenant, namespace, sourceName);
+    public void stopSource(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sourceName") String sourceName) {
+        source.stopFunctionInstances(tenant, namespace, sourceName);
     }
 
     @GET
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
index d8ae7574c5..965db1c6c1 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java
@@ -43,7 +43,7 @@ public LookupResult(NamespaceEphemeralData namespaceEphemeralData) {
     }
 
     public LookupResult(String httpUrl, String httpUrlTls, String brokerServiceUrl, String brokerServiceUrlTls) {
-        this.type = Type.RedirectUrl; // type = reidrect => as current broker is
+        this.type = Type.RedirectUrl; // type = redirect => as current broker is
                                       // not owner and prepares LookupResult
                                       // with other broker's urls
         this.lookupData = new LookupData(brokerServiceUrl, brokerServiceUrlTls, httpUrl, httpUrlTls);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index fa8f7830c5..0e4b1b17c4 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -89,7 +89,6 @@
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.TenantInfo;
@@ -302,31 +301,28 @@ boolean isWorkerServiceAvailable() {
         return true;
     }
 
-    public Response registerFunction(final String tenant,
-                                     final String namespace,
-                                     final String componentName,
-                                     final InputStream uploadedInputStream,
-                                     final FormDataContentDisposition fileDetail,
-                                     final String functionPkgUrl,
-                                     final String functionDetailsJson,
-                                     final String componentConfigJson,
-                                     final String clientRole) {
+    public void registerFunction(final String tenant,
+                                 final String namespace,
+                                 final String componentName,
+                                 final InputStream uploadedInputStream,
+                                 final FormDataContentDisposition fileDetail,
+                                 final String functionPkgUrl,
+                                 final String functionDetailsJson,
+                                 final String componentConfigJson,
+                                 final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         if (tenant == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Tenant is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
         }
         if (namespace == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Namespace is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
         }
         if (componentName == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(componentType + " Name is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
         }
 
         try {
@@ -337,46 +333,36 @@ public Response registerFunction(final String tenant,
             if (!worker().getBrokerAdmin().namespaces().getNamespaces(tenant).contains(qualifiedNamespace)) {
                 log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace,
                         componentName, namespace);
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData("Namespace does not exist")).build();
+                throw new RestException(Status.BAD_REQUEST, "Namespace does not exist");
             }
         } catch (PulsarAdminException.NotAuthorizedException e) {
             log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
                     componentName, clientRole, componentType);
-            return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("client is not authorize to perform operation")).build();
-
+            throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
         } catch (PulsarAdminException.NotFoundException e) {
-            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace,
-                    componentName, tenant);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Tenant does not exist")).build();
+            log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, componentName, tenant);
+            throw new RestException(Status.BAD_REQUEST, "Tenant does not exist");
         } catch (PulsarAdminException e) {
-            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace,
-                    componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
                         componentName, clientRole, componentType);
-                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
             log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             log.error("{} {}/{}/{} already exists", componentType, tenant, namespace, componentName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s already exists", componentType, componentName))).build();
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", componentType, componentName));
         }
 
         FunctionDetails functionDetails;
@@ -396,16 +382,14 @@ public Response registerFunction(final String tenant,
             }
         } catch (Exception e) {
             log.error("Invalid register {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
             log.error("{} {}/{}/{} cannot be admitted by the runtime factory", componentType, tenant, namespace, componentName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
         }
 
         // function state
@@ -418,12 +402,11 @@ public Response registerFunction(final String tenant,
             packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails,
                     functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
         } catch (Exception e) {
-            return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage()))
-                    .build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-        return updateRequest(functionMetaDataBuilder.build());
+        updateRequest(functionMetaDataBuilder.build());
     }
 
     private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionDetails functionDetails,
@@ -483,51 +466,45 @@ public Response registerFunction(final String tenant,
     }
 
 
-    public Response updateFunction(final String tenant,
-                                   final String namespace,
-                                   final String componentName,
-                                   final InputStream uploadedInputStream,
-                                   final FormDataContentDisposition fileDetail,
-                                   final String functionPkgUrl,
-                                   final String functionDetailsJson,
-                                   final String componentConfigJson,
-                                   final String clientRole) {
+    public void updateFunction(final String tenant,
+                               final String namespace,
+                               final String componentName,
+                               final InputStream uploadedInputStream,
+                               final FormDataContentDisposition fileDetail,
+                               final String functionPkgUrl,
+                               final String functionDetailsJson,
+                               final String componentConfigJson,
+                               final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         if (tenant == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Tenant is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
         }
         if (namespace == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Namespace is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
         }
         if (componentName == null) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(componentType + " Name is not provided")).build();
+            throw new RestException(Status.BAD_REQUEST, componentType + " Name is not provided");
         }
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
                         componentName, clientRole, componentType);
-                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+                throw new RestException(Status.UNAUTHORIZED, componentType + "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
             log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         String mergedComponentConfigJson;
@@ -546,8 +523,7 @@ public Response updateFunction(final String tenant,
                 FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
                 mergedComponentConfigJson = new Gson().toJson(mergedConfig);
             } catch (Exception e) {
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(e.getMessage())).build();
+                throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
         } else if (componentType.equals(SOURCE)) {
             SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
@@ -561,8 +537,7 @@ public Response updateFunction(final String tenant,
                 SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
                 mergedComponentConfigJson = new Gson().toJson(mergedConfig);
             } catch (Exception e) {
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(e.getMessage())).build();
+                throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
         } else {
             SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
@@ -576,15 +551,13 @@ public Response updateFunction(final String tenant,
                 SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
                 mergedComponentConfigJson = new Gson().toJson(mergedConfig);
             } catch (Exception e) {
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(e.getMessage())).build();
+                throw new RestException(Status.BAD_REQUEST, e.getMessage());
             }
         }
 
         if (existingComponentConfigJson.equals(mergedComponentConfigJson) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
             log.error("{}/{}/{} Update contains no changes", tenant, namespace, componentName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData("Update contains no change")).build();
+            throw new RestException(Status.BAD_REQUEST, "Update contains no change");
         }
 
         FunctionDetails functionDetails;
@@ -607,16 +580,14 @@ public Response updateFunction(final String tenant,
             }
         } catch (Exception e) {
             log.error("Invalid update {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         try {
             worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
         } catch (Exception e) {
             log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", componentType, tenant, namespace, componentName);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()))).build();
+            throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", componentType, componentName, e.getMessage()));
         }
 
         // function state
@@ -629,37 +600,34 @@ public Response updateFunction(final String tenant,
                 packageLocationMetaDataBuilder = getFunctionPackageLocation(functionDetails,
                         functionPkgUrl, fileDetail, uploadedInputStreamAsFile);
             } catch (Exception e) {
-                return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage()))
-                        .build();
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
             }
         } else {
             packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
         }
 
         functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-        return updateRequest(functionMetaDataBuilder.build());
+        updateRequest(functionMetaDataBuilder.build());
     }
 
-    public Response deregisterFunction(final String tenant,
-                                       final String namespace,
-                                       final String componentName,
-                                       final String clientRole) {
+    public void deregisterFunction(final String tenant,
+                                   final String namespace,
+                                   final String componentName,
+                                   final String clientRole) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         try {
             if (!isAuthorizedRole(tenant, clientRole)) {
                 log.error("{}/{}/{} Client [{}] is not admin and authorized to deregister {}", tenant, namespace,
                         componentName, clientRole, componentType);
-                return Response.status(Status.UNAUTHORIZED).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData("client is not authorize to perform operation")).build();
+                throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
             }
         } catch (PulsarAdminException e) {
             log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
 
         // delete state table
@@ -672,11 +640,7 @@ public Response deregisterFunction(final String tenant,
                 // ignored if the state table doesn't exist
             } catch (Exception e) {
                 log.error("{}/{}/{} Failed to delete state table", e);
-                return Response
-                        .status(Status.INTERNAL_SERVER_ERROR)
-                        .type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(e.getMessage()))
-                        .build();
+                throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
             }
         }
 
@@ -685,21 +649,18 @@ public Response deregisterFunction(final String tenant,
             validateDeregisterRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
             log.error("Invalid deregister {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             log.error("{} to deregister does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         CompletableFuture<RequestResult> completableFuture = functionMetaDataManager.deregisterFunction(tenant,
@@ -709,29 +670,25 @@ public Response deregisterFunction(final String tenant,
         try {
             requestResult = completableFuture.get();
             if (!requestResult.isSuccess()) {
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(requestResult.getMessage())).build();
+                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
             }
         } catch (ExecutionException e) {
-            log.error("Execution Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
-                    e);
-            return Response.serverError().type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getCause().getMessage())).build();
+            log.error("Execution Exception while deregistering {} @ /{}/{}/{}",
+                    componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getCause().getMessage());
         } catch (InterruptedException e) {
-            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}", componentType, tenant, namespace, componentName,
-                    e);
-            return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON).build();
+            log.error("Interrupted Exception while deregistering {} @ /{}/{}/{}",
+                    componentType, tenant, namespace, componentName, e);
+            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
         }
-
-        return Response.status(Status.OK).entity(requestResult.toJson()).build();
     }
 
-    public Response getFunctionInfo(final String tenant,
-                                    final String namespace,
-                                    final String componentName) {
+    public FunctionConfig getFunctionInfo(final String tenant,
+                                          final String namespace,
+                                          final String componentName) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         // validate parameters
@@ -739,62 +696,48 @@ public Response getFunctionInfo(final String tenant,
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
             log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
         }
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
-        }
-
-        String retVal;
-        if (componentType.equals(FUNCTION)) {
-            FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retVal = new Gson().toJson(config);
-        } else if (componentType.equals(SOURCE)) {
-            SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retVal = new Gson().toJson(config);
-        } else {
-            SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
-            retVal = new Gson().toJson(config);
+            throw new RestException(Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
         }
-        return Response.status(Status.OK).entity(retVal).build();
+        FunctionConfig config = FunctionConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+        return config;
     }
 
-    public Response stopFunctionInstance(final String tenant,
-                                         final String namespace,
-                                         final String componentName,
-                                         final String instanceId,
-                                         final URI uri) {
-        return stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri);
+    public void stopFunctionInstance(final String tenant,
+                                     final String namespace,
+                                     final String componentName,
+                                     final String instanceId,
+                                     final URI uri) {
+        stopFunctionInstance(tenant, namespace, componentName, instanceId, false, uri);
     }
 
-    public Response restartFunctionInstance(final String tenant,
-                                            final String namespace,
-                                            final String componentName,
-                                            final String instanceId,
-                                            final URI uri) {
-        return stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri);
+    public void restartFunctionInstance(final String tenant,
+                                        final String namespace,
+                                        final String componentName,
+                                        final String instanceId,
+                                        final URI uri) {
+        stopFunctionInstance(tenant, namespace, componentName, instanceId, true, uri);
     }
 
-    public Response stopFunctionInstance(final String tenant,
-                                         final String namespace,
-                                         final String componentName,
-                                         final String instanceId,
-                                         final boolean restart,
-                                         final URI uri) {
+    public void stopFunctionInstance(final String tenant,
+                                     final String namespace,
+                                     final String componentName,
+                                     final String instanceId,
+                                     final boolean restart,
+                                     final URI uri) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         // validate parameters
@@ -802,55 +745,52 @@ public Response stopFunctionInstance(final String tenant,
             validateGetFunctionInstanceRequestParams(tenant, namespace, componentName, componentType, instanceId);
         } catch (IllegalArgumentException e) {
             log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
+            functionRuntimeManager.stopFunctionInstance(tenant, namespace, componentName,
                     Integer.parseInt(instanceId), restart, uri);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
             log.error("Failed to restart {}: {}/{}/{}/{}", componentType, tenant, namespace, componentName, instanceId, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
 
-    public Response stopFunctionInstances(final String tenant,
-                                          final String namespace,
-                                          final String componentName) {
-        return stopFunctionInstances(tenant, namespace, componentName, false);
+    public void stopFunctionInstances(final String tenant,
+                                      final String namespace,
+                                      final String componentName) {
+        stopFunctionInstances(tenant, namespace, componentName, false);
     }
 
-    public Response restartFunctionInstances(final String tenant,
-                                             final String namespace,
-                                             final String componentName) {
-        return stopFunctionInstances(tenant, namespace, componentName, true);
+    public void restartFunctionInstances(final String tenant,
+                                         final String namespace,
+                                         final String componentName) {
+        stopFunctionInstances(tenant, namespace, componentName, true);
     }
 
-    public Response stopFunctionInstances(final String tenant,
-                                          final String namespace,
-                                          final String componentName,
-                                          final boolean restart) {
+    public void stopFunctionInstances(final String tenant,
+                                      final String namespace,
+                                      final String componentName,
+                                      final boolean restart) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         // validate parameters
@@ -858,32 +798,29 @@ public Response stopFunctionInstances(final String tenant,
             validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
         } catch (IllegalArgumentException e) {
             log.error("Invalid restart {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
             log.error("{} in stopFunctionInstances does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("%s %s doesn't exist", componentType, componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
         if (!calculateSubjectType(functionMetaData).equals(componentType)) {
             log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format(componentType + " %s doesn't exist", componentName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", componentType, componentName));
         }
 
         FunctionRuntimeManager functionRuntimeManager = worker().getFunctionRuntimeManager();
         try {
-            return functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
+            functionRuntimeManager.stopFunctionInstances(tenant, namespace, componentName, restart);
         } catch (WebApplicationException we) {
             throw we;
         } catch (Exception e) {
             log.error("Failed to restart {}: {}/{}/{}", componentType, tenant, namespace, componentName, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR.getStatusCode(), e.getMessage()).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
     }
 
@@ -927,7 +864,6 @@ public FunctionStats getFunctionStats(final String tenant,
         }
 
         return functionStats;
-
     }
 
     public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionsInstanceStats(final String tenant,
@@ -980,10 +916,10 @@ public FunctionStats getFunctionStats(final String tenant,
         return functionInstanceStatsData;
     }
 
-    public Response listFunctions(final String tenant, final String namespace) {
+    public List<String> listFunctions(final String tenant, final String namespace) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         // validate parameters
@@ -991,24 +927,22 @@ public Response listFunctions(final String tenant, final String namespace) {
             validateListFunctionRequestParams(tenant, namespace);
         } catch (IllegalArgumentException e) {
             log.error("Invalid list {} request @ /{}/{}", componentType, tenant, namespace, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
 
         Collection<FunctionMetaData> functionStateList = functionMetaDataManager.listFunctions(tenant, namespace);
-        List<String> retval = new LinkedList<>();
+        List<String> retVals = new LinkedList<>();
         for (FunctionMetaData functionMetaData : functionStateList) {
             if (calculateSubjectType(functionMetaData).equals(componentType)) {
-                retval.add(functionMetaData.getFunctionDetails().getName());
+                retVals.add(functionMetaData.getFunctionDetails().getName());
             }
         }
-
-        return Response.status(Status.OK).entity(new Gson().toJson(retval.toArray())).build();
+        return retVals;
     }
 
-    private Response updateRequest(FunctionMetaData functionMetaData) {
+    private void updateRequest(final FunctionMetaData functionMetaData) {
 
         // Submit to FMT
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
@@ -1019,18 +953,14 @@ private Response updateRequest(FunctionMetaData functionMetaData) {
         try {
             requestResult = completableFuture.get();
             if (!requestResult.isSuccess()) {
-                return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                        .entity(new ErrorData(requestResult.getMessage())).build();
+                throw new RestException(Status.BAD_REQUEST, requestResult.getMessage());
             }
         } catch (ExecutionException e) {
-            return Response.serverError().type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getCause().getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         } catch (InterruptedException e) {
-            return Response.status(Status.REQUEST_TIMEOUT).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getCause().getMessage())).build();
+            throw new RestException(Status.REQUEST_TIMEOUT, e.getMessage());
         }
 
-        return Response.status(Status.OK).build();
     }
 
     public List<ConnectorDefinition> getListOfConnectors() {
@@ -1043,15 +973,15 @@ private Response updateRequest(FunctionMetaData functionMetaData) {
         return this.worker().getConnectorsManager().getConnectors();
     }
 
-    public Response triggerFunction(final String tenant,
-                                    final String namespace,
-                                    final String functionName,
-                                    final String input,
-                                    final InputStream uploadedInputStream,
-                                    final String topic) {
+    public String triggerFunction(final String tenant,
+                                  final String namespace,
+                                  final String functionName,
+                                  final String input,
+                                  final InputStream uploadedInputStream,
+                                  final String topic) {
 
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         // validate parameters
@@ -1059,15 +989,13 @@ public Response triggerFunction(final String tenant,
             validateTriggerRequestParams(tenant, namespace, functionName, topic, input, uploadedInputStream);
         } catch (IllegalArgumentException e) {
             log.error("Invalid trigger function request @ /{}/{}/{}", tenant, namespace, functionName, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
         if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
             log.error("Function in trigger function does not exist @ /{}/{}/{}", tenant, namespace, functionName);
-            return Response.status(Status.NOT_FOUND).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(String.format("Function %s doesn't exist", functionName))).build();
+            throw new RestException(Status.NOT_FOUND, String.format("Function %s doesn't exist", functionName));
         }
 
         FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
@@ -1081,20 +1009,19 @@ public Response triggerFunction(final String tenant,
                     .keySet().iterator().next();
         } else {
             log.error("Function in trigger function has more than 1 input topics @ /{}/{}/{}", tenant, namespace, functionName);
-            return Response.status(Status.BAD_REQUEST).build();
-        }
+            throw new RestException(Status.BAD_REQUEST, "Function in trigger function has more than 1 input topics");
+            }
         if (functionMetaData.getFunctionDetails().getSource().getInputSpecsCount() == 0
                 || !functionMetaData.getFunctionDetails().getSource().getInputSpecsMap()
                 .containsKey(inputTopicToWrite)) {
             log.error("Function in trigger function has unidentified topic @ /{}/{}/{} {}", tenant, namespace, functionName, inputTopicToWrite);
-
-            return Response.status(Status.BAD_REQUEST).build();
+            throw new RestException(Status.BAD_REQUEST, "Function in trigger function has unidentified topic");
         }
         try {
             worker().getBrokerAdmin().topics().getSubscriptions(inputTopicToWrite);
         } catch (PulsarAdminException e) {
             log.error("Function in trigger function is not ready @ /{}/{}/{}", tenant, namespace, functionName);
-            return Response.status(Status.BAD_REQUEST).build();
+            throw new RestException(Status.BAD_REQUEST, "Function in trigger function is not ready");
         }
         String outputTopic = functionMetaData.getFunctionDetails().getSink().getTopic();
         Reader<byte[]> reader = null;
@@ -1115,7 +1042,7 @@ public Response triggerFunction(final String tenant,
             }
             MessageId msgId = producer.send(targetArray);
             if (reader == null) {
-                return Response.status(Status.OK).build();
+                return null;
             }
             long curTime = System.currentTimeMillis();
             long maxTime = curTime + 1000;
@@ -1129,14 +1056,14 @@ public Response triggerFunction(final String tenant,
                             Base64.getDecoder().decode((String) msg.getProperties().get("__pfn_input_msg_id__")));
                     if (msgId.equals(newMsgId)
                             && msg.getProperties().get("__pfn_input_topic__").equals(inputTopicToWrite)) {
-                        return Response.status(Status.OK).entity(msg.getData()).build();
+                       return new String(msg.getData());
                     }
                 }
                 curTime = System.currentTimeMillis();
             }
-            return Response.status(Status.REQUEST_TIMEOUT).build();
+            throw new RestException(Status.REQUEST_TIMEOUT, "Requeste Timed Out");
         } catch (Exception e) {
-            return Response.status(Status.INTERNAL_SERVER_ERROR).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         } finally {
             if (reader != null) {
                 reader.closeAsync();
@@ -1147,16 +1074,17 @@ public Response triggerFunction(final String tenant,
         }
     }
 
-    public Response getFunctionState(final String tenant,
-                                     final String namespace,
-                                     final String functionName,
-                                     final String key) {
+    public FunctionState getFunctionState(final String tenant,
+                                          final String namespace,
+                                          final String functionName,
+                                          final String key) {
+
         if (!isWorkerServiceAvailable()) {
-            return getUnavailableResponse();
+            throwUnavailableException();
         }
 
         if (null == worker().getStateStoreAdminClient()) {
-            return getStateStoreUnvailableResponse();
+            throwStateStoreUnvailableResponse();
         }
 
         // validate parameters
@@ -1165,8 +1093,7 @@ public Response getFunctionState(final String tenant,
         } catch (IllegalArgumentException e) {
             log.error("Invalid getFunctionState request @ /{}/{}/{}/{}",
                     tenant, namespace, functionName, key, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         String tableNs = StateUtils.getStateNamespace(tenant, namespace);
@@ -1184,34 +1111,28 @@ public Response getFunctionState(final String tenant,
                     .build());
         }
 
+        FunctionState value;
         try (Table<ByteBuf, ByteBuf> table = result(storageClient.get().openTable(tableName))) {
             try (KeyValue<ByteBuf, ByteBuf> kv = result(table.getKv(Unpooled.wrappedBuffer(key.getBytes(UTF_8))))) {
                 if (null == kv) {
-                    return Response.status(Status.NOT_FOUND)
-                            .entity(new String("key '" + key + "' doesn't exist."))
-                            .build();
+                    throw new RestException(Status.NOT_FOUND, "key '" + key + "' doesn't exist.");
                 } else {
-                    FunctionState value;
                     if (kv.isNumber()) {
                         value = new FunctionState(key, null, kv.numberValue(), kv.version());
                     } else {
                         value = new FunctionState(key, new String(ByteBufUtil.getBytes(kv.value()), UTF_8), null, kv.version());
                     }
-                    return Response.status(Status.OK)
-                            .entity(new Gson().toJson(value))
-                            .build();
                 }
             }
         } catch (Exception e) {
             log.error("Error while getFunctionState request @ /{}/{}/{}/{}",
                     tenant, namespace, functionName, key, e);
-            return Response.status(Status.INTERNAL_SERVER_ERROR).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
-
+        return value;
     }
 
-    public Response uploadFunction(final InputStream uploadedInputStream, final String path) {
+    public void uploadFunction(final InputStream uploadedInputStream, final String path) {
         // validate parameters
         try {
             if (uploadedInputStream == null || path == null) {
@@ -1219,26 +1140,22 @@ public Response uploadFunction(final InputStream uploadedInputStream, final Stri
             }
         } catch (IllegalArgumentException e) {
             log.error("Invalid upload function request @ /{}", path, e);
-            return Response.status(Status.BAD_REQUEST).type(MediaType.APPLICATION_JSON)
-                    .entity(new ErrorData(e.getMessage())).build();
+            throw new RestException(Status.BAD_REQUEST, e.getMessage());
         }
 
         // Upload to bookkeeper
         try {
             log.info("Uploading function package to {}", path);
-
             Utils.uploadToBookeeper(worker().getDlogNamespace(), uploadedInputStream, path);
         } catch (IOException e) {
             log.error("Error uploading file {}", path, e);
-            return Response.serverError().type(MediaType.APPLICATION_JSON).entity(new ErrorData(e.getMessage()))
-                    .build();
+            throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
         }
-
-        return Response.status(Status.OK).build();
     }
 
-    public Response downloadFunction(final String path) {
-        return Response.status(Status.OK).entity(new StreamingOutput() {
+    public StreamingOutput downloadFunction(final String path) {
+
+        final StreamingOutput streamingOutput = new StreamingOutput() {
             @Override
             public void write(final OutputStream output) throws IOException {
                 if (path.startsWith(org.apache.pulsar.common.functions.Utils.HTTP)) {
@@ -1257,7 +1174,9 @@ public void write(final OutputStream output) throws IOException {
                     Utils.downloadFromBookkeeper(worker().getDlogNamespace(), output, path);
                 }
             }
-        }).build();
+        };
+
+        return streamingOutput;
     }
 
     private void validateListFunctionRequestParams(final String tenant, final String namespace) throws IllegalArgumentException {
@@ -1658,19 +1577,14 @@ private void validateTriggerRequestParams(final String tenant,
         }
     }
 
-    private Response getUnavailableResponse() {
-        return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                .entity(new ErrorData(
-                        "Function worker service is not done initializing. " + "Please try again in a little while."))
-                .build();
+    protected void throwUnavailableException() {
+        throw new RestException(Status.SERVICE_UNAVAILABLE,
+                "Function worker service is not done initializing. " + "Please try again in a little while.");
     }
 
-    private Response getStateStoreUnvailableResponse() {
-        return Response.status(Status.SERVICE_UNAVAILABLE).type(MediaType.APPLICATION_JSON)
-                .entity(new ErrorData(
-                        "State storage client is not done initializing. "
-                                + "Please try again in a little while."))
-                .build();
+    private void throwStateStoreUnvailableResponse() {
+        throw new RestException(Status.SERVICE_UNAVAILABLE,
+                "State storage client is not done initializing. " + "Please try again in a little while.");
     }
 
     public static String createPackagePath(String tenant, String namespace, String functionName, String fileName) {
@@ -1751,7 +1665,8 @@ protected void componentInstanceStatusRequestValidate (final String tenant,
         int parallelism = functionMetaData.getFunctionDetails().getParallelism();
         if (instanceId < 0 || instanceId >= parallelism) {
             log.error("instanceId in get {} Status out of bounds @ /{}/{}/{}", componentType, tenant, namespace, componentName);
-            throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
+            throw new RestException(Status.BAD_REQUEST,
+                    String.format("%s %s doesn't have instance with id %s", componentType, componentName, instanceId));
         }
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
index 862aaf6483..846900f183 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinkImpl.java
@@ -20,10 +20,13 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.SinkConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -249,4 +252,34 @@ public SinkStatus getSinkStatus(final String tenant,
 
         return sinkStatus;
     }
+
+    public SinkConfig getSinkInfo(final String tenant,
+                                  final String namespace,
+                                  final String componentName) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+        }
+        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+        }
+        SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+        return config;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
index d915bb0d53..ac096bb3ed 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourceImpl.java
@@ -20,10 +20,13 @@
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.ExceptionInformation;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.SourceConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.rest.RestException;
 
@@ -244,4 +247,34 @@ public SourceStatus getSourceStatus(final String tenant,
         }
         return sourceInstanceStatusData;
     }
+
+    public SourceConfig getSourceInfo(final String tenant,
+                                      final String namespace,
+                                      final String componentName) {
+
+        if (!isWorkerServiceAvailable()) {
+            throwUnavailableException();
+        }
+
+        // validate parameters
+        try {
+            validateGetFunctionRequestParams(tenant, namespace, componentName, componentType);
+        } catch (IllegalArgumentException e) {
+            log.error("Invalid get {} request @ /{}/{}/{}", componentType, tenant, namespace, componentName, e);
+            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+        }
+
+        FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+        if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
+            log.error("{} does not exist @ /{}/{}/{}", componentType, tenant, namespace, componentName);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+        }
+        Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
+        if (!calculateSubjectType(functionMetaData).equals(componentType)) {
+            log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, componentType);
+            throw new RestException(Response.Status.NOT_FOUND, String.format(componentType + " %s doesn't exist", componentName));
+        }
+        SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
+        return config;
+    }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index f1dca502e4..b620fe5222 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -18,17 +18,19 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.policies.data.FunctionStats;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.glassfish.jersey.media.multipart.FormDataParam;
-import org.apache.pulsar.common.io.ConnectorDefinition;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.List;
 
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
@@ -40,11 +42,10 @@
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
 import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.Response;
-
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
 
 @Slf4j
 @Path("/functions")
@@ -56,20 +57,19 @@ public FunctionApiV2Resource() {
         this.functions = new FunctionsImpl(this);
     }
 
-
     @POST
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerFunction(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("functionName") String functionName,
-                                     final @FormDataParam("data") InputStream uploadedInputStream,
-                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                     final @FormDataParam("url") String functionPkgUrl,
-                                     final @FormDataParam("functionDetails") String functionDetailsJson,
-                                     final @FormDataParam("functionConfig") String functionConfigJson) {
+    public void registerFunction(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("functionName") String functionName,
+                                 final @FormDataParam("data") InputStream uploadedInputStream,
+                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                 final @FormDataParam("url") String functionPkgUrl,
+                                 final @FormDataParam("functionDetails") String functionDetailsJson,
+                                 final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
 
     }
@@ -77,36 +77,34 @@ public Response registerFunction(final @PathParam("tenant") String tenant,
     @PUT
     @Path("/{tenant}/{namespace}/{functionName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateFunction(final @PathParam("tenant") String tenant,
-                                   final @PathParam("namespace") String namespace,
-                                   final @PathParam("functionName") String functionName,
-                                   final @FormDataParam("data") InputStream uploadedInputStream,
-                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                   final @FormDataParam("url") String functionPkgUrl,
-                                   final @FormDataParam("functionDetails") String functionDetailsJson,
-                                   final @FormDataParam("functionConfig") String functionConfigJson) {
+    public void updateFunction(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("functionName") String functionName,
+                               final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                               final @FormDataParam("url") String functionPkgUrl,
+                               final @FormDataParam("functionDetails") String functionDetailsJson,
+                               final @FormDataParam("functionConfig") String functionConfigJson) {
 
-        return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+        functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
                 functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
 
     }
 
-
     @DELETE
     @Path("/{tenant}/{namespace}/{functionName}")
-    public Response deregisterFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+    public void deregisterFunction(final @PathParam("tenant") String tenant,
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("functionName") String functionName) {
+        functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
     }
 
     @GET
     @Path("/{tenant}/{namespace}/{functionName}")
-    public Response getFunctionInfo(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace,
-                                    final @PathParam("functionName") String functionName)
-            throws IOException {
-        return functions.getFunctionInfo(
-            tenant, namespace, functionName);
+    public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName) {
+        return functions.getFunctionInfo(tenant, namespace, functionName);
     }
 
     @GET
@@ -163,8 +161,8 @@ public FunctionStatus getFunctionStatus(
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/stats")
     public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("functionName") String functionName) throws IOException {
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName) throws IOException {
         return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
     }
 
@@ -180,10 +178,11 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
-    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(final @PathParam("tenant") String tenant,
-                                                                                                  final @PathParam("namespace") String namespace,
-                                                                                                  final @PathParam("functionName") String functionName,
-                                                                                                  final @PathParam("instanceId") String instanceId) throws IOException {
+    public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
+            final @PathParam("tenant") String tenant,
+            final @PathParam("namespace") String namespace,
+            final @PathParam("functionName") String functionName,
+            final @PathParam("instanceId") String instanceId) throws IOException {
         return functions.getFunctionsInstanceStats(
                 tenant, namespace, functionName, instanceId, uri.getRequestUri());
     }
@@ -191,76 +190,88 @@ public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
     @POST
     @Path("/{tenant}/{namespace}/{functionName}/trigger")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response triggerFunction(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace,
-                                    final @PathParam("functionName") String functionName,
-                                    final @FormDataParam("data") String input,
-                                    final @FormDataParam("dataStream") InputStream uploadedInputStream,
-                                    final @FormDataParam("topic") String topic) {
+    public String triggerFunction(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("functionName") String functionName,
+                                  final @FormDataParam("data") String input,
+                                  final @FormDataParam("dataStream") InputStream uploadedInputStream,
+                                  final @FormDataParam("topic") String topic) {
         return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
     }
 
     @POST
     @ApiOperation(value = "Restart function instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) {
-        return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("functionName") String functionName,
+                                final @PathParam("instanceId") String instanceId) {
+        functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all function instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.restartFunctionInstances(tenant, namespace, functionName);
+    public void restartFunction(final @PathParam("tenant") String tenant,
+                                final @PathParam("namespace") String namespace,
+                                final @PathParam("functionName") String functionName) {
+        functions.restartFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
     @ApiOperation(value = "Stop function instance", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
-            final @PathParam("instanceId") String instanceId) {
-        return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName,
+                             final @PathParam("instanceId") String instanceId) {
+        functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all function instances", response = Void.class)
-    @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+    @ApiResponses(value = {
+            @ApiResponse(code = 400, message = "Invalid request"),
             @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 500, message = "Internal server error")
+    })
     @Path("/{tenant}/{namespace}/{functionName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopFunction(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
-        return functions.stopFunctionInstances(tenant, namespace, functionName);
+    public void stopFunction(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("functionName") String functionName) {
+        functions.stopFunctionInstances(tenant, namespace, functionName);
     }
 
     @POST
     @Path("/upload")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
-                                   final @FormDataParam("path") String path) {
-        return functions.uploadFunction(uploadedInputStream, path);
+    public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+                               final @FormDataParam("path") String path) {
+        functions.uploadFunction(uploadedInputStream, path);
     }
 
     @GET
     @Path("/download")
-    public Response downloadFunction(final @QueryParam("path") String path) {
+    public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
         return functions.downloadFunction(path);
     }
 
@@ -272,11 +283,10 @@ public Response downloadFunction(final @QueryParam("path") String path) {
 
     @GET
     @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
-    public Response getFunctionState(final @PathParam("tenant") String tenant,
-                                      final @PathParam("namespace") String namespace,
-                                      final @PathParam("functionName") String functionName,
-                                     final @PathParam("key") String key) throws IOException {
-        return functions.getFunctionState(
-            tenant, namespace, functionName, key);
+    public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
+                                          final @PathParam("namespace") String namespace,
+                                          final @PathParam("functionName") String functionName,
+                                          final @PathParam("key") String key) throws IOException {
+        return functions.getFunctionState(tenant, namespace, functionName, key);
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 6da47272d1..e630db4668 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -24,6 +24,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.policies.data.FunctionStatus;
 import org.apache.pulsar.common.policies.data.SinkStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
@@ -49,54 +50,51 @@ public SinkApiV2Resource() {
         this.sink = new SinkImpl(this);
     }
 
-
     @POST
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerSink(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("sinkName") String sinkName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("sinkConfig") String sinkConfigJson) {
-
-        return sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+    public void registerSink(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("sinkName") String sinkName,
+                             final @FormDataParam("data") InputStream uploadedInputStream,
+                             final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sinkConfigJson, clientAppId());
-
     }
 
     @PUT
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateSink(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("sinkName") String sinkName,
-                               final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                               final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("sinkConfig") String sinkConfigJson) {
-
-        return sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+    public void updateSink(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sinkName") String sinkName,
+                           final @FormDataParam("data") InputStream uploadedInputStream,
+                           final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                           final @FormDataParam("url") String functionPkgUrl,
+                           final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+        sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sinkConfigJson, clientAppId());
-
     }
 
-
     @DELETE
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public Response deregisterSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+    public void deregisterSink(final @PathParam("tenant") String tenant,
+                               final @PathParam("namespace") String namespace,
+                               final @PathParam("sinkName") String sinkName) {
+        sink.deregisterFunction(tenant, namespace, sinkName, clientAppId());
     }
 
     @GET
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public Response getSinkInfo(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace,
-                                final @PathParam("sinkName") String sinkName)
+    public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant,
+                                  final @PathParam("namespace") String namespace,
+                                  final @PathParam("sinkName") String sinkName)
             throws IOException {
-        return sink.getFunctionInfo(tenant, namespace, sinkName);
+        return sink.getSinkInfo(tenant, namespace, sinkName);
     }
 
     @GET
@@ -116,8 +114,7 @@ public Response getSinkInfo(final @PathParam("tenant") String tenant,
             final @PathParam("namespace") String namespace,
             final @PathParam("sinkName") String sinkName,
             final @PathParam("instanceId") String instanceId) throws IOException {
-        return sink.getSinkInstanceStatus(
-            tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+        return sink.getSinkInstanceStatus(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
     }
 
     @GET
@@ -133,42 +130,42 @@ public Response getSinkInfo(final @PathParam("tenant") String tenant,
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sinkName}/status")
     public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("sinkName") String sinkName) throws IOException {
+                                    final @PathParam("namespace") String namespace,
+                                    final @PathParam("sinkName") String sinkName) throws IOException {
         return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri());
     }
 
     @GET
     @Path("/{tenant}/{namespace}")
-    public Response listSink(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace) {
+    public List<String> listSink(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace) {
         return sink.listFunctions(tenant, namespace);
-
     }
 
     @POST
     @ApiOperation(value = "Restart sink instance", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+    @ApiResponse(code = 404, message = "The function does not exist"),
+    @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
-            final @PathParam("instanceId") String instanceId) {
-        return sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+    public void restartSink(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sinkName") String sinkName,
+                            final @PathParam("instanceId") String instanceId) {
+        sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all sink instances", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+    @ApiResponse(code = 404, message = "The function does not exist"), @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sinkName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return sink.restartFunctionInstances(tenant, namespace, sinkName);
+    public void restartSink(final @PathParam("tenant") String tenant,
+                            final @PathParam("namespace") String namespace,
+                            final @PathParam("sinkName") String sinkName) {
+        sink.restartFunctionInstances(tenant, namespace, sinkName);
     }
 
     @POST
@@ -178,34 +175,36 @@ public Response restartSink(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
-            final @PathParam("instanceId") String instanceId) {
-        return sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+    public void stopSink(final @PathParam("tenant") String tenant,
+                         final @PathParam("namespace") String namespace,
+                         final @PathParam("sinkName") String sinkName,
+                         final @PathParam("instanceId") String instanceId) {
+        sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Stop all sink instances", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+    @ApiResponse(code = 404, message = "The function does not exist"),
+    @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sinkName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSink(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
-        return sink.stopFunctionInstances(tenant, namespace, sinkName);
+    public void stopSink(final @PathParam("tenant") String tenant,
+                         final @PathParam("namespace") String namespace,
+                         final @PathParam("sinkName") String sinkName) {
+        sink.stopFunctionInstances(tenant, namespace, sinkName);
     }
 
     @GET
     @Path("/builtinsinks")
     public List<ConnectorDefinition> getSinkList() {
         List<ConnectorDefinition> connectorDefinitions = sink.getListOfConnectors();
-        List<ConnectorDefinition> retval = new ArrayList<>();
+        List<ConnectorDefinition> retVal = new ArrayList<>();
         for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
             if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
-                retval.add(connectorDefinition);
+                retVal.add(connectorDefinition);
             }
         }
-        return retval;
+        return retVal;
     }
 }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 0ad5a34b97..bb49fe1995 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -24,6 +24,7 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.policies.data.SourceStatus;
 import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
@@ -48,11 +49,10 @@ public SourceApiV2Resource() {
         this.source = new SourceImpl(this);
     }
 
-
     @POST
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response registerSource(final @PathParam("tenant") String tenant,
+    public void registerSource(final @PathParam("tenant") String tenant,
                                    final @PathParam("namespace") String namespace,
                                    final @PathParam("sourceName") String sourceName,
                                    final @FormDataParam("data") InputStream uploadedInputStream,
@@ -60,7 +60,7 @@ public Response registerSource(final @PathParam("tenant") String tenant,
                                    final @FormDataParam("url") String functionPkgUrl,
                                    final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
-        return source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+        source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sourceConfigJson, clientAppId());
 
     }
@@ -68,34 +68,34 @@ public Response registerSource(final @PathParam("tenant") String tenant,
     @PUT
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public Response updateSource(final @PathParam("tenant") String tenant,
-                                 final @PathParam("namespace") String namespace,
-                                 final @PathParam("sourceName") String sourceName,
-                                 final @FormDataParam("data") InputStream uploadedInputStream,
-                                 final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String functionPkgUrl,
-                                 final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
-        return source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+    public void updateSource(final @PathParam("tenant") String tenant,
+                             final @PathParam("namespace") String namespace,
+                             final @PathParam("sourceName") String sourceName,
+                             final @FormDataParam("data") InputStream uploadedInputStream,
+                             final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                             final @FormDataParam("url") String functionPkgUrl,
+                             final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+        source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
                 functionPkgUrl, null, sourceConfigJson, clientAppId());
-
     }
 
 
     @DELETE
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public Response deregisterSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+    public void deregisterSource(final @PathParam("tenant") String tenant,
+                                 final @PathParam("namespace") String namespace,
+                                 final @PathParam("sourceName") String sourceName) {
+        source.deregisterFunction(tenant, namespace, sourceName, clientAppId());
     }
 
     @GET
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public Response getSourceInfo(final @PathParam("tenant") String tenant,
-                                  final @PathParam("namespace") String namespace,
-                                  final @PathParam("sourceName") String sourceName)
+    public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant,
+                                      final @PathParam("namespace") String namespace,
+                                      final @PathParam("sourceName") String sourceName)
             throws IOException {
-        return source.getFunctionInfo(tenant, namespace, sourceName);
+        return source.getSourceInfo(tenant, namespace, sourceName);
     }
 
     @GET
@@ -139,10 +139,9 @@ public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
 
     @GET
     @Path("/{tenant}/{namespace}")
-    public Response listSources(final @PathParam("tenant") String tenant,
-                                final @PathParam("namespace") String namespace) {
+    public List<String> listSources(final @PathParam("tenant") String tenant,
+                                    final @PathParam("namespace") String namespace) {
         return source.listFunctions(tenant, namespace);
-
     }
 
     @POST
@@ -152,22 +151,24 @@ public Response listSources(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
-            final @PathParam("instanceId") String instanceId) {
-        return source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+    public void restartSource(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("sourceName") String sourceName,
+                              final @PathParam("instanceId") String instanceId) {
+        source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
     @ApiOperation(value = "Restart all source instances", response = Void.class)
     @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+    @ApiResponse(code = 404, message = "The function does not exist"),
+    @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response restartSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return source.restartFunctionInstances(tenant, namespace, sourceName);
+    public void restartSource(final @PathParam("tenant") String tenant,
+                              final @PathParam("namespace") String namespace,
+                              final @PathParam("sourceName") String sourceName) {
+        source.restartFunctionInstances(tenant, namespace, sourceName);
     }
 
     @POST
@@ -177,10 +178,11 @@ public Response restartSource(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
-            final @PathParam("instanceId") String instanceId) {
-        return source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+    public void stopSource(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sourceName") String sourceName,
+                           final @PathParam("instanceId") String instanceId) {
+        source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
     }
 
     @POST
@@ -190,9 +192,10 @@ public Response stopSource(final @PathParam("tenant") String tenant,
             @ApiResponse(code = 500, message = "Internal server error") })
     @Path("/{tenant}/{namespace}/{sourceName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public Response stopSource(final @PathParam("tenant") String tenant,
-            final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
-        return source.stopFunctionInstances(tenant, namespace, sourceName);
+    public void stopSource(final @PathParam("tenant") String tenant,
+                           final @PathParam("namespace") String namespace,
+                           final @PathParam("sourceName") String sourceName) {
+        source.stopFunctionInstances(tenant, namespace, sourceName);
     }
 
     @GET
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 98747edd32..25a3a6604f 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -18,31 +18,8 @@
  */
 package org.apache.pulsar.functions.worker.rest.api.v2;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.*;
-import static org.powermock.api.mockito.PowerMockito.doNothing;
-import static org.powermock.api.mockito.PowerMockito.doReturn;
-import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.mockStatic;
-import static org.testng.Assert.assertEquals;
-
 import com.google.common.collect.Lists;
 import com.google.gson.Gson;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-
-import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import javax.ws.rs.core.StreamingOutput;
-
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
@@ -51,7 +28,7 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
-import org.apache.pulsar.common.policies.data.ErrorData;
+import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.Context;
@@ -65,12 +42,16 @@
 import org.apache.pulsar.functions.proto.Function.SubscriptionType;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
 import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -80,6 +61,32 @@
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+import static org.powermock.api.mockito.PowerMockito.doNothing;
+import static org.powermock.api.mockito.PowerMockito.doReturn;
+import static org.powermock.api.mockito.PowerMockito.doThrow;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
+import static org.testng.Assert.assertEquals;
+
 /**
  * Unit test of {@link FunctionApiV2Resource}.
  */
@@ -176,42 +183,51 @@ public void setup() throws Exception {
     // Register Functions
     //
 
-    @Test
-    public void testRegisterFunctionMissingTenant() throws IOException {
-        testRegisterFunctionMissingArguments(
-            null,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Tenant is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testRegisterFunctionMissingTenant() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    null,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionMissingNamespace() throws IOException {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            null,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testRegisterFunctionMissingNamespace() {
+        try {
+            testRegisterFunctionMissingArguments(
+                tenant,
                 null,
-                "Namespace is not provided");
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionMissingFunctionName() throws IOException {
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testRegisterFunctionMissingFunctionName() {
+        try {
         testRegisterFunctionMissingArguments(
             tenant,
             namespace,
@@ -223,98 +239,122 @@ public void testRegisterFunctionMissingFunctionName() throws IOException {
                 outputSerdeClassName,
             className,
             parallelism,
-                null,
-                "Function Name is not provided");
+                null);
+    } catch (RestException re){
+        assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+        throw re;
+    }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    public void testRegisterFunctionMissingPackage() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionMissingPackage() throws IOException {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            null,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Function Package is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "No input topic\\(s\\) specified for the function")
+    public void testRegisterFunctionMissingInputTopics() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionMissingInputTopics() throws IOException {
-        testRegisterFunctionMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
+    public void testRegisterFunctionMissingPackageDetails() {
+        try {
+            testRegisterFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
+                mockedInputStream,
+                topicsToSerDeClassName,
                 null,
-                null,
-                mockedFormData,
                 outputTopic,
-                outputSerdeClassName,
+                    outputSerdeClassName,
                 className,
                 parallelism,
-                null,
-                "No input topic(s) specified for the function");
-    }
-
-    @Test
-    public void testRegisterFunctionMissingPackageDetails() throws IOException {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            null,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Function Package is not provided");
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionMissingClassName() throws IOException {
-        testRegisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            null,
-            parallelism,
-                null,
-                "Function classname cannot be null");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function classname cannot be null")
+    public void testRegisterFunctionMissingClassName() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionWrongClassName() throws IOException {
-        testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                "UnknownClass",
-                parallelism,
-                null,
-                "User class must be in class path");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "User class must be in class path")
+    public void testRegisterFunctionWrongClassName() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    "UnknownClass",
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionWrongParallelism() throws IOException {
-        testRegisterFunctionMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function parallelism should positive number")
+    public void testRegisterFunctionWrongParallelism() {
+        try {
+            testRegisterFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -325,31 +365,39 @@ public void testRegisterFunctionWrongParallelism() throws IOException {
                 outputSerdeClassName,
                 className,
                 -2,
-                null,
-                "Function parallelism should positive number");
+                null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionSameInputOutput() throws IOException {
-        testRegisterFunctionMissingArguments(
-                tenant,
-                namespace,
-                function,
-                mockedInputStream,
-                topicsToSerDeClassName,
-                mockedFormData,
-                topicsToSerDeClassName.keySet().iterator().next(),
-                outputSerdeClassName,
-                className,
-                parallelism,
-                null,
-                "Output topic " + topicsToSerDeClassName.keySet().iterator().next()
-                        + " is also being used as an input topic (topics must be one or the other)");
+    @Test(expectedExceptions = RestException.class,
+            expectedExceptionsMessageRegExp = "Output topic persistent://sample/standalone/ns1/test_src is also being used as an input topic \\(topics must be one or the other\\)")
+    public void testRegisterFunctionSameInputOutput() {
+        try {
+            testRegisterFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    mockedInputStream,
+                    topicsToSerDeClassName,
+                    mockedFormData,
+                    topicsToSerDeClassName.keySet().iterator().next(),
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionWrongOutputTopic() throws IOException {
-        testRegisterFunctionMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topic " + function + "-output-topic/test:" + " is invalid")
+    public void testRegisterFunctionWrongOutputTopic() {
+        try {
+            testRegisterFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -360,13 +408,17 @@ public void testRegisterFunctionWrongOutputTopic() throws IOException {
                 outputSerdeClassName,
                 className,
                 parallelism,
-                null,
-                "Output topic " + function + "-output-topic/test:" + " is invalid");
+                null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionHttpUrl() throws IOException {
-        testRegisterFunctionMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Corrupted Jar File")
+    public void testRegisterFunctionHttpUrl() {
+        try {
+            testRegisterFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -377,8 +429,11 @@ public void testRegisterFunctionHttpUrl() throws IOException {
                 outputSerdeClassName,
                 className,
                 parallelism,
-                "http://localhost:1234/test",
-                "Corrupted Jar File");
+                "http://localhost:1234/test");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testRegisterFunctionMissingArguments(
@@ -392,8 +447,7 @@ private void testRegisterFunctionMissingArguments(
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String functionPkgUrl,
-            String errorExpected) throws IOException {
+            String functionPkgUrl) {
         FunctionConfig functionConfig = new FunctionConfig();
         if (tenant != null) {
             functionConfig.setTenant(tenant);
@@ -421,7 +475,7 @@ private void testRegisterFunctionMissingArguments(
         }
         functionConfig.setRuntime(FunctionConfig.Runtime.JAVA);
 
-        Response response = resource.registerFunction(
+        resource.registerFunction(
                 tenant,
                 namespace,
                 function,
@@ -432,13 +486,11 @@ private void testRegisterFunctionMissingArguments(
                 new Gson().toJson(functionConfig),
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultFunction() {
+    private void registerDefaultFunction() {
         FunctionConfig functionConfig = createDefaultFunctionConfig();
-        return resource.registerFunction(
+        resource.registerFunction(
             tenant,
             namespace,
             function,
@@ -450,227 +502,279 @@ private Response registerDefaultFunction() {
                 null);
     }
 
-    @Test
-    public void testRegisterExistedFunction() throws IOException {
-        Configurator.setRootLevel(Level.DEBUG);
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
-
-        Response response = registerDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + function + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function already exists")
+    public void testRegisterExistedFunction() {
+        try {
+            Configurator.setRootLevel(Level.DEBUG);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-            any(File.class),
-            any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        Response response = registerDefaultFunction();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
+    public void testRegisterFunctionUploadFailure() throws Exception {
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                any(File.class),
+                any(Namespace.class));
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     @Test
     public void testRegisterFunctionSuccess() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(true)
-            .setMessage("function registered");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(true)
+                .setMessage("function registered");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultFunction();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionNonexistantNamespace() throws Exception {
-        this.namespaceList.clear();
-        Response response = registerDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
+    public void testRegisterFunctionNonExistingNamespace() {
+        try {
+            this.namespaceList.clear();
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
     public void testRegisterFunctionNonexistantTenant() throws Exception {
-        when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
-        Response response = registerDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testRegisterFunctionFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("function failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testRegisterFunctionInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultFunction();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Update Functions
     //
 
-    @Test
-    public void testUpdateFunctionMissingTenant() throws IOException {
-        testUpdateFunctionMissingArguments(
-            null,
-            namespace,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Tenant is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testUpdateFunctionMissingTenant() {
+        try {
+            testUpdateFunctionMissingArguments(
+                null,
+                namespace,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Tenant is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testUpdateFunctionMissingNamespace() throws IOException {
-        testUpdateFunctionMissingArguments(
-            tenant,
-            null,
-            function,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Namespace is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testUpdateFunctionMissingNamespace() {
+        try {
+            testUpdateFunctionMissingArguments(
+                tenant,
+                null,
+                function,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Namespace is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testUpdateFunctionMissingFunctionName() throws IOException {
-        testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Function Name is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testUpdateFunctionMissingFunctionName() {
+        try {
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                null,
+                mockedInputStream,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Function Name is not provided");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingPackage() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            null,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Update contains no change");
-    }
-
-    @Test
-    public void testUpdateFunctionMissingInputTopic() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        testUpdateFunctionMissingArguments(
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
                 null,
-                null,
+                topicsToSerDeClassName,
                 mockedFormData,
                 outputTopic,
-                outputSerdeClassName,
+                    outputSerdeClassName,
                 className,
                 parallelism,
-                "Update contains no change");
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateFunctionMissingInputTopic() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                    tenant,
+                    namespace,
+                    function,
+                    null,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateFunctionMissingClassName() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        testUpdateFunctionMissingArguments(
-            tenant,
-            namespace,
-            function,
-            null,
-            topicsToSerDeClassName,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            null,
-            parallelism,
-                "Update contains no change");
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
+                tenant,
+                namespace,
+                function,
+                null,
+                topicsToSerDeClassName,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                null,
+                parallelism,
+                    "Update contains no change");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     @Test
     public void testUpdateFunctionChangedParallelism() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        testUpdateFunctionMissingArguments(
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -682,14 +786,19 @@ public void testUpdateFunctionChangedParallelism() throws IOException {
                 null,
                 parallelism + 1,
                 null);
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Output topics differ")
     public void testUpdateFunctionChangedInputs() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        testUpdateFunctionMissingArguments(
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -701,16 +810,21 @@ public void testUpdateFunctionChangedInputs() throws IOException {
                 null,
                 parallelism,
                 "Output topics differ");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
     public void testUpdateFunctionChangedOutput() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-        Map<String, String> someOtherInput = new HashMap<>();
-        someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
-        testUpdateFunctionMissingArguments(
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+            Map<String, String> someOtherInput = new HashMap<>();
+            someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
+            testUpdateFunctionMissingArguments(
                 tenant,
                 namespace,
                 function,
@@ -722,6 +836,10 @@ public void testUpdateFunctionChangedOutput() throws IOException {
                 null,
                 parallelism,
                 "Input Topics cannot be altered");
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testUpdateFunctionMissingArguments(
@@ -735,7 +853,7 @@ private void testUpdateFunctionMissingArguments(
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String expectedError) throws IOException {
+            String expectedError) {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         FunctionConfig functionConfig = new FunctionConfig();
@@ -773,7 +891,7 @@ private void testUpdateFunctionMissingArguments(
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
             function,
@@ -784,15 +902,9 @@ private void testUpdateFunctionMissingArguments(
             new Gson().toJson(functionConfig),
                 null);
 
-        if (expectedError == null) {
-            assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        } else {
-            assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
-        }
     }
 
-    private Response updateDefaultFunction() throws IOException {
+    private void updateDefaultFunction() {
         FunctionConfig functionConfig = new FunctionConfig();
         functionConfig.setTenant(tenant);
         functionConfig.setNamespace(namespace);
@@ -804,7 +916,7 @@ private Response updateDefaultFunction() throws IOException {
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
 
-        return resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
             function,
@@ -816,29 +928,34 @@ private Response updateDefaultFunction() throws IOException {
                 null);
     }
 
-    @Test
-    public void testUpdateNotExistedFunction() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        Response response = updateDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
+    public void testUpdateNotExistedFunction() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateFunctionUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-            any(File.class),
-            any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
-
-        Response response = updateDefaultFunction();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                any(File.class),
+                any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     @Test
@@ -858,12 +975,11 @@ public void testUpdateFunctionSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultFunction();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        updateDefaultFunction();
     }
 
     @Test
-    public void testUpdateFunctionWithUrl() throws IOException {
+    public void testUpdateFunctionWithUrl() {
         Configurator.setRootLevel(Level.DEBUG);
 
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -887,7 +1003,7 @@ public void testUpdateFunctionWithUrl() throws IOException {
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
             function,
@@ -898,117 +1014,136 @@ public void testUpdateFunctionWithUrl() throws IOException {
             new Gson().toJson(functionConfig),
                 null);
 
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to register")
     public void testUpdateFunctionFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("function failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testUpdateFunctionInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadToBookeeper(
-            any(Namespace.class),
-            any(InputStream.class),
-            anyString());
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadToBookeeper(
+                any(Namespace.class),
+                any(InputStream.class),
+                anyString());
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultFunction();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // deregister function
     //
 
-    @Test
-    public void testDeregisterFunctionMissingTenant() throws Exception {
-        testDeregisterFunctionMissingArguments(
-            null,
-            namespace,
-            function,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testDeregisterFunctionMissingTenant() {
+        try {
+
+            testDeregisterFunctionMissingArguments(
+                null,
+                namespace,
+                function
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterFunctionMissingNamespace() throws Exception {
-        testDeregisterFunctionMissingArguments(
-            tenant,
-            null,
-            function,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testDeregisterFunctionMissingNamespace() {
+        try {
+            testDeregisterFunctionMissingArguments(
+                tenant,
+                null,
+                function
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterFunctionMissingFunctionName() throws Exception {
-        testDeregisterFunctionMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Function Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testDeregisterFunctionMissingFunctionName() {
+        try {
+             testDeregisterFunctionMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testDeregisterFunctionMissingArguments(
-        String tenant,
-        String namespace,
-        String function,
-        String missingFieldName
+            String tenant,
+            String namespace,
+            String function
     ) {
-        Response response = resource.deregisterFunction(
+        resource.deregisterFunction(
             tenant,
             namespace,
             function,
                 null);
-
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response deregisterDefaultFunction() {
-        return resource.deregisterFunction(
+    private void deregisterDefaultFunction() {
+        resource.deregisterFunction(
             tenant,
             namespace,
             function,
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
     public void testDeregisterNotExistedFunction() {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        Response response = deregisterDefaultFunction();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            deregisterDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testDeregisterFunctionSuccess() throws Exception {
+    public void testDeregisterFunctionSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
@@ -1017,87 +1152,107 @@ public void testDeregisterFunctionSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultFunction();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(rr.toJson(), response.getEntity());
+        deregisterDefaultFunction();
     }
 
-    @Test
-    public void testDeregisterFunctionFailure() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "function failed to deregister")
+    public void testDeregisterFunctionFailure() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("function failed to deregister");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("function failed to deregister");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultFunction();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultFunction();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterFunctionInterrupted() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregisteration interrupted")
+    public void testDeregisterFunctionInterrupted() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function deregisteration interrupted"));
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                    new IOException("Function deregisteration interrupted"));
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultFunction();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultFunction();
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Get Function Info
     //
 
-    @Test
-    public void testGetFunctionMissingTenant() throws Exception {
-        testGetFunctionMissingArguments(
-            null,
-            namespace,
-            function,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testGetFunctionMissingTenant() {
+        try {
+            testGetFunctionMissingArguments(
+                null,
+                namespace,
+                function
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetFunctionMissingNamespace() throws Exception {
-        testGetFunctionMissingArguments(
-            tenant,
-            null,
-            function,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testGetFunctionMissingNamespace() {
+        try {
+            testGetFunctionMissingArguments(
+                tenant,
+                null,
+                function
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetFunctionMissingFunctionName() throws Exception {
-        testGetFunctionMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Function Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
+    public void testGetFunctionMissingFunctionName() {
+        try {
+            testGetFunctionMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        }
+        catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testGetFunctionMissingArguments(
-        String tenant,
-        String namespace,
-        String function,
-        String missingFieldName
-    ) throws IOException {
-        Response response = resource.getFunctionInfo(
+            String tenant,
+            String namespace,
+            String function
+    ) {
+        resource.getFunctionInfo(
             tenant,
             namespace,
             function
         );
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultFunctionInfo() throws IOException {
+    private FunctionConfig getDefaultFunctionInfo() {
         return resource.getFunctionInfo(
             tenant,
             namespace,
@@ -1105,17 +1260,19 @@ private Response getDefaultFunctionInfo() throws IOException {
         );
     }
 
-    @Test
-    public void testGetNotExistedFunction() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
-
-        Response response = getDefaultFunctionInfo();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function " + function + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
+    public void testGetNotExistedFunction() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
+            getDefaultFunctionInfo();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testGetFunctionSuccess() throws Exception {
+    public void testGetFunctionSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
 
         SinkSpec sinkSpec = SinkSpec.newBuilder()
@@ -1139,48 +1296,54 @@ public void testGetFunctionSuccess() throws Exception {
             .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
 
-        Response response = getDefaultFunctionInfo();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        FunctionConfig functionConfig = getDefaultFunctionInfo();
         assertEquals(
-                new Gson().toJson(FunctionConfigUtils.convertFromDetails(functionDetails)),
-                response.getEntity());
+                FunctionConfigUtils.convertFromDetails(functionDetails),
+                functionConfig);
     }
 
     //
     // List Functions
     //
 
-    @Test
-    public void testListFunctionsMissingTenant() throws Exception {
-        testListFunctionsMissingArguments(
-            null,
-            namespace,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testListFunctionsMissingTenant() {
+        try {
+            testListFunctionsMissingArguments(
+                null,
+                namespace
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testListFunctionsMissingNamespace() throws Exception {
-        testListFunctionsMissingArguments(
-            tenant,
-            null,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testListFunctionsMissingNamespace() {
+        try {
+            testListFunctionsMissingArguments(
+                tenant,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testListFunctionsMissingArguments(
-        String tenant,
-        String namespace,
-        String missingFieldName
+            String tenant,
+            String namespace
     ) {
-        Response response = resource.listFunctions(
+        resource.listFunctions(
             tenant,
             namespace
         );
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultFunctions() {
+    private List<String> listDefaultFunctions() {
         return resource.listFunctions(
             tenant,
             namespace
@@ -1188,9 +1351,9 @@ private Response listDefaultFunctions() {
     }
 
     @Test
-    public void testListFunctionsSuccess() throws Exception {
-        List<String> functions = Lists.newArrayList("test-1", "test-2");
-        List<FunctionMetaData> metaDataList = new LinkedList<>();
+    public void testListFunctionsSuccess() {
+        final List<String> functions = Lists.newArrayList("test-1", "test-2");
+        final List<FunctionMetaData> metaDataList = new LinkedList<>();
         FunctionMetaData functionMetaData1 = FunctionMetaData.newBuilder().setFunctionDetails(
                 FunctionDetails.newBuilder().setName("test-1").build()
         ).build();
@@ -1201,13 +1364,12 @@ public void testListFunctionsSuccess() throws Exception {
         metaDataList.add(functionMetaData2);
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(metaDataList);
 
-        Response response = listDefaultFunctions();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> functionList = listDefaultFunctions();
+        assertEquals(functions, functionList);
     }
 
     @Test
-    public void testOnlyGetSources() throws Exception {
+    public void testOnlyGetSources() {
         List<String> functions = Lists.newArrayList("test-2");
         List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
         FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
@@ -1224,9 +1386,8 @@ public void testOnlyGetSources() throws Exception {
         doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
         doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
-        Response response = listDefaultFunctions();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> functionList = listDefaultFunctions();
+        assertEquals(functions, functionList);
     }
 
     @Test
@@ -1234,8 +1395,7 @@ public void testDownloadFunctionHttpUrl() throws Exception {
         String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImpl function = new FunctionsImpl(null);
-        Response response = function.downloadFunction(jarHttpUrl);
-        StreamingOutput streamOutput = (StreamingOutput) response.getEntity();
+        StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1250,8 +1410,7 @@ public void testDownloadFunctionFile() throws Exception {
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
         FunctionsImpl function = new FunctionsImpl(null);
-        Response response = function.downloadFunction("file://"+fileLocation);
-        StreamingOutput streamOutput = (StreamingOutput) response.getEntity();
+        StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
         File pkgFile = new File(testDir, UUID.randomUUID().toString());
         OutputStream output = new FileOutputStream(pkgFile);
         streamOutput.write(output);
@@ -1262,7 +1421,7 @@ public void testDownloadFunctionFile() throws Exception {
     }
 
     @Test
-    public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
+    public void testRegisterFunctionFileUrlWithValidSinkClass() {
         Configurator.setRootLevel(Level.DEBUG);
 
         String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
@@ -1283,14 +1442,13 @@ public void testRegisterFunctionFileUrlWithValidSinkClass() throws IOException {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        Response response = resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
+        resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl,
                 null, new Gson().toJson(functionConfig), null);
 
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
     @Test
-    public void testRegisterFunctionWithConflictingFields() throws IOException {
+    public void testRegisterFunctionWithConflictingFields() {
         Configurator.setRootLevel(Level.DEBUG);
         String actualTenant = "DIFFERENT_TENANT";
         String actualNamespace = "DIFFERENT_NAMESPACE";
@@ -1316,10 +1474,8 @@ public void testRegisterFunctionWithConflictingFields() throws IOException {
         functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
         functionConfig.setOutput(outputTopic);
         functionConfig.setOutputSerdeClassName(outputSerdeClassName);
-        Response response = resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
+        resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl,
                 null, new Gson().toJson(functionConfig), null);
-
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
     public static FunctionConfig createDefaultFunctionConfig() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index bb497ae2de..ee9a014059 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -29,8 +29,8 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -38,11 +38,15 @@
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
-import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SinkImpl;
 import org.apache.pulsar.io.cassandra.CassandraStringSink;
@@ -50,14 +54,12 @@
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.Assert;
 import org.testng.IObjectFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
@@ -81,7 +83,7 @@
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -180,54 +182,69 @@ public void setup() throws Exception {
     // Register Functions
     //
 
-    @Test
-    public void testRegisterSinkMissingTenant() throws IOException {
-        testRegisterSinkMissingArguments(
-            null,
-            namespace,
-                sink,
-            mockedInputStream,
-            mockedFormData,
-            topicsToSerDeClassName,
-            className,
-            parallelism,
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testRegisterSinkMissingTenant() {
+        try {
+            testRegisterSinkMissingArguments(
                 null,
-                "Tenant is not provided");
+                namespace,
+                    sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkMissingNamespace() throws IOException {
-        testRegisterSinkMissingArguments(
-            tenant,
-            null,
-                sink,
-            mockedInputStream,
-            mockedFormData,
-            topicsToSerDeClassName,
-            className,
-            parallelism,
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testRegisterSinkMissingNamespace() {
+        try {
+            testRegisterSinkMissingArguments(
+                tenant,
                 null,
-                "Namespace is not provided");
+                    sink,
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkMissingFunctionName() throws IOException {
-        testRegisterSinkMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            mockedFormData,
-            topicsToSerDeClassName,
-            className,
-            parallelism,
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided")
+    public void testRegisterSinkMissingFunctionName() {
+        try {
+            testRegisterSinkMissingArguments(
+                tenant,
+                namespace,
                 null,
-                "Sink Name is not provided");
+                mockedInputStream,
+                mockedFormData,
+                topicsToSerDeClassName,
+                className,
+                parallelism,
+                    null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkMissingPackage() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Package is not provided")
+    public void testRegisterSinkMissingPackage() {
+        try {
+            testRegisterSinkMissingArguments(
             tenant,
             namespace,
                 sink,
@@ -236,13 +253,18 @@ public void testRegisterSinkMissingPackage() throws IOException {
             topicsToSerDeClassName,
             null,
             parallelism,
-                null,
-                "Sink Package is not provided");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkMissingPackageDetails() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty")
+    public void testRegisterSinkMissingPackageDetails() {
+        try {
+            testRegisterSinkMissingArguments(
             tenant,
             namespace,
                 sink,
@@ -251,14 +273,19 @@ public void testRegisterSinkMissingPackageDetails() throws IOException {
             topicsToSerDeClassName,
             null,
             parallelism,
-                null,
-                "zip file is empty");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
     public void testRegisterSinkInvalidJarNoSink() throws IOException {
-        FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
-        testRegisterSinkMissingArguments(
+        try {
+            FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
+            testRegisterSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -267,13 +294,18 @@ public void testRegisterSinkInvalidJarNoSink() throws IOException {
                 topicsToSerDeClassName,
                 null,
                 parallelism,
-                null,
-                "Failed to extract sink class from archive");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkNoInput() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs")
+    public void testRegisterSinkNoInput() {
+        try {
+            testRegisterSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -282,13 +314,18 @@ public void testRegisterSinkNoInput() throws IOException {
                 null,
                 className,
                 parallelism,
-                null,
-                "Must specify at least one topic of input via topicToSerdeClassName, topicsPattern, topicToSchemaType or inputSpecs");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkNegativeParallelism() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
+    public void testRegisterSinkNegativeParallelism() {
+        try {
+            testRegisterSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -297,13 +334,18 @@ public void testRegisterSinkNegativeParallelism() throws IOException {
                 topicsToSerDeClassName,
                 className,
                 -2,
-                null,
-                "Sink parallelism should positive number");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkZeroParallelism() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink parallelism should positive number")
+    public void testRegisterSinkZeroParallelism() {
+        try {
+            testRegisterSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -312,13 +354,18 @@ public void testRegisterSinkZeroParallelism() throws IOException {
                 topicsToSerDeClassName,
                 className,
                 0,
-                null,
-                "Sink parallelism should positive number");
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSinkHttpUrl() throws IOException {
-        testRegisterSinkMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Sink Jar")
+    public void testRegisterSinkHttpUrl() {
+        try {
+            testRegisterSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -327,8 +374,12 @@ public void testRegisterSinkHttpUrl() throws IOException {
                 topicsToSerDeClassName,
                 className,
                 parallelism,
-                "http://localhost:1234/test",
-                "Invalid Sink Jar");
+                "http://localhost:1234/test"
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testRegisterSinkMissingArguments(
@@ -340,8 +391,7 @@ private void testRegisterSinkMissingArguments(
             Map<String, String> inputTopicMap,
             String className,
             Integer parallelism,
-            String pkgUrl,
-            String errorExpected) throws IOException {
+            String pkgUrl) {
         SinkConfig sinkConfig = new SinkConfig();
         if (tenant != null) {
             sinkConfig.setTenant(tenant);
@@ -362,7 +412,7 @@ private void testRegisterSinkMissingArguments(
             sinkConfig.setParallelism(parallelism);
         }
 
-        Response response = resource.registerFunction(
+        resource.registerFunction(
                 tenant,
                 namespace,
                 sink,
@@ -373,13 +423,11 @@ private void testRegisterSinkMissingArguments(
                 new Gson().toJson(sinkConfig),
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSink() throws IOException {
+    private void registerDefaultSink() throws IOException {
         SinkConfig sinkConfig = createDefaultSinkConfig();
-        return resource.registerFunction(
+        resource.registerFunction(
             tenant,
             namespace,
                 sink,
@@ -391,31 +439,37 @@ private Response registerDefaultSink() throws IOException {
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink already exists")
     public void testRegisterExistedSink() throws IOException {
-        Configurator.setRootLevel(Level.DEBUG);
+        try {
+            Configurator.setRootLevel(Level.DEBUG);
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        Response response = registerDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Sink " + sink + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterSinkUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-            any(File.class),
-            any(Namespace.class));
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                any(File.class),
+                any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-        Response response = registerDefaultSink();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     @Test
@@ -435,8 +489,7 @@ public void testRegisterSinkSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSink();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        registerDefaultSink();
     }
 
     @Test
@@ -468,7 +521,7 @@ public void testRegisterSinkConflictingFields() throws Exception {
         sinkConfig.setClassName(className);
         sinkConfig.setParallelism(parallelism);
         sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
-        Response response = resource.registerFunction(
+        resource.registerFunction(
                 actualTenant,
                 actualNamespace,
                 actualName,
@@ -478,58 +531,64 @@ public void testRegisterSinkConflictingFields() throws Exception {
                 null,
                 new Gson().toJson(sinkConfig),
                 null);
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
     public void testRegisterSinkFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("sink failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testRegisterSinkInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSink();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Update Functions
     //
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
     public void testUpdateSinkMissingTenant() throws IOException {
-        testUpdateSinkMissingArguments(
+        try {
+            testUpdateSinkMissingArguments(
             null,
             namespace,
                 sink,
@@ -539,11 +598,16 @@ public void testUpdateSinkMissingTenant() throws IOException {
             className,
             parallelism,
                 "Tenant is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
     public void testUpdateSinkMissingNamespace() throws IOException {
-        testUpdateSinkMissingArguments(
+        try {
+            testUpdateSinkMissingArguments(
             tenant,
             null,
                 sink,
@@ -553,11 +617,16 @@ public void testUpdateSinkMissingNamespace() throws IOException {
             className,
             parallelism,
                 "Namespace is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided")
     public void testUpdateSinkMissingFunctionName() throws IOException {
-        testUpdateSinkMissingArguments(
+        try {
+            testUpdateSinkMissingArguments(
             tenant,
             namespace,
             null,
@@ -567,33 +636,43 @@ public void testUpdateSinkMissingFunctionName() throws IOException {
             className,
             parallelism,
                 "Sink Name is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSinkMissingPackage() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
-        testUpdateSinkMissingArguments(
-            tenant,
-            namespace,
-                sink,
-            null,
-            mockedFormData,
-            topicsToSerDeClassName,
-            null,
-            parallelism,
-                "Update contains no change");
+            testUpdateSinkMissingArguments(
+                tenant,
+                namespace,
+                    sink,
+                null,
+                mockedFormData,
+                topicsToSerDeClassName,
+                null,
+                parallelism,
+                    "Update contains no change");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
     public void testUpdateSinkMissingInputs() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
-        testUpdateSinkMissingArguments(
+            testUpdateSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -603,17 +682,22 @@ public void testUpdateSinkMissingInputs() throws IOException {
                 null,
                 parallelism,
                 "Update contains no change");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Input Topics cannot be altered")
     public void testUpdateSinkDifferentInputs() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-
-        Map<String, String> inputTopics = new HashMap<>();
-        inputTopics.put("DifferntTopic", DEFAULT_SERDE);
-        testUpdateSinkMissingArguments(
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+
+            Map<String, String> inputTopics = new HashMap<>();
+            inputTopics.put("DifferntTopic", DEFAULT_SERDE);
+            testUpdateSinkMissingArguments(
                 tenant,
                 namespace,
                 sink,
@@ -623,6 +707,10 @@ public void testUpdateSinkDifferentInputs() throws IOException {
                 className,
                 parallelism,
                 "Input Topics cannot be altered");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     @Test
@@ -701,7 +789,7 @@ private void testUpdateSinkMissingArguments(
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
             sink,
@@ -712,15 +800,9 @@ private void testUpdateSinkMissingArguments(
             new Gson().toJson(sinkConfig),
                 null);
 
-        if (expectedError == null) {
-            assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        } else {
-            assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
-        }
     }
 
-    private Response updateDefaultSink() throws IOException {
+    private void updateDefaultSink() throws IOException {
         SinkConfig sinkConfig = new SinkConfig();
         sinkConfig.setTenant(tenant);
         sinkConfig.setNamespace(namespace);
@@ -747,7 +829,7 @@ private Response updateDefaultSink() throws IOException {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-        return resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
                 sink,
@@ -759,29 +841,34 @@ private Response updateDefaultSink() throws IOException {
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
     public void testUpdateNotExistedSink() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
-
-        Response response = updateDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+            updateDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateSinkUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
-
-        Response response = updateDefaultSink();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+            updateDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     @Test
@@ -801,8 +888,7 @@ public void testUpdateSinkSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSink();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        updateDefaultSink();
     }
 
     @Test
@@ -844,7 +930,7 @@ public void testUpdateSinkWithUrl() throws IOException {
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
                 sink,
@@ -854,118 +940,136 @@ public void testUpdateSinkWithUrl() throws IOException {
             null,
             new Gson().toJson(sinkConfig),
                 null);
-
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to register")
     public void testUpdateSinkFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                    .setSuccess(false)
+                    .setMessage("sink failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registeration interrupted")
     public void testUpdateSinkInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                    new IOException("Function registeration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSink();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
-    // deregister source
+    // deregister sink
     //
 
-    @Test
-    public void testDeregisterSinkMissingTenant() throws Exception {
-        testDeregisterSinkMissingArguments(
-            null,
-            namespace,
-                sink,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testDeregisterSinkMissingTenant() {
+        try {
+            testDeregisterSinkMissingArguments(
+                null,
+                namespace,
+                sink
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSinkMissingNamespace() throws Exception {
-        testDeregisterSinkMissingArguments(
-            tenant,
-            null,
-                sink,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testDeregisterSinkMissingNamespace() {
+        try {
+            testDeregisterSinkMissingArguments(
+                tenant,
+                null,
+                    sink
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSinkMissingFunctionName() throws Exception {
-        testDeregisterSinkMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Sink Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided")
+    public void testDeregisterSinkMissingFunctionName() {
+        try {
+            testDeregisterSinkMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testDeregisterSinkMissingArguments(
-        String tenant,
-        String namespace,
-        String sink,
-        String missingFieldName
+            String tenant,
+            String namespace,
+            String sink
     ) {
-        Response response = resource.deregisterFunction(
+        resource.deregisterFunction(
             tenant,
             namespace,
             sink,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response deregisterDefaultSink() {
-        return resource.deregisterFunction(
+    private void deregisterDefaultSink() {
+        resource.deregisterFunction(
             tenant,
             namespace,
                 sink,
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
     public void testDeregisterNotExistedSink() {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
-
-        Response response = deregisterDefaultSink();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+            deregisterDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testDeregisterSinkSuccess() throws Exception {
+    public void testDeregisterSinkSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
@@ -973,106 +1077,123 @@ public void testDeregisterSinkSuccess() throws Exception {
             .setMessage("source deregistered");
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
-
-        Response response = deregisterDefaultSink();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(rr.toJson(), response.getEntity());
     }
 
-    @Test
-    public void testDeregisterSinkFailure() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "sink failed to deregister")
+    public void testDeregisterSinkFailure() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to deregister");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("sink failed to deregister");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSinkInterrupted() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
+    public void testDeregisterSinkInterrupted() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function deregisteration interrupted"));
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function deregistration interrupted"));
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultSink();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultSink();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Get Sink Info
     //
 
-    @Test
-    public void testGetSinkMissingTenant() throws Exception {
-        testGetSinkMissingArguments(
-            null,
-            namespace,
-                sink,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testGetSinkMissingTenant() {
+        try {
+            testGetSinkMissingArguments(
+                null,
+                namespace,
+                    sink
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetSinkMissingNamespace() throws Exception {
-        testGetSinkMissingArguments(
-            tenant,
-            null,
-                sink,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testGetSinkMissingNamespace() {
+        try {
+            testGetSinkMissingArguments(
+                    tenant,
+                    null,
+                    sink
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetSinkMissingFunctionName() throws Exception {
-        testGetSinkMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Sink Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Name is not provided")
+    public void testGetSinkMissingFunctionName() {
+        try {
+
+            testGetSinkMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testGetSinkMissingArguments(
-        String tenant,
-        String namespace,
-        String sink,
-        String missingFieldName
-    ) throws IOException {
-        Response response = resource.getFunctionInfo(
+            String tenant,
+            String namespace,
+            String sink
+    ) {
+        resource.getFunctionInfo(
             tenant,
             namespace,
             sink
         );
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultSinkInfo() throws IOException {
-        return resource.getFunctionInfo(
+    private SinkConfig getDefaultSinkInfo() {
+        return resource.getSinkInfo(
             tenant,
             namespace,
                 sink
         );
     }
 
-    @Test
-    public void testGetNotExistedSink() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
-
-        Response response = getDefaultSinkInfo();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Sink " + sink + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink test-sink doesn't exist")
+    public void testGetNotExistedSink() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(false);
+            getDefaultSinkInfo();
+        } catch (RestException re) {
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testGetSinkSuccess() throws Exception {
+    public void testGetSinkSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
         Function.SourceSpec sourceSpec = Function.SourceSpec.newBuilder()
@@ -1103,48 +1224,54 @@ public void testGetSinkSuccess() throws Exception {
             .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(sink))).thenReturn(metaData);
 
-        Response response = getDefaultSinkInfo();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        getDefaultSinkInfo();
         assertEquals(
-            new Gson().toJson(SinkConfigUtils.convertFromDetails(functionDetails)),
-            response.getEntity());
+            SinkConfigUtils.convertFromDetails(functionDetails),
+            getDefaultSinkInfo());
     }
 
     //
     // List Sinks
     //
 
-    @Test
-    public void testListSinksMissingTenant() throws Exception {
-        testListSinksMissingArguments(
-            null,
-            namespace,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testListSinksMissingTenant() {
+        try {
+            testListSinksMissingArguments(
+                null,
+                namespace
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testListFunctionsMissingNamespace() throws Exception {
-        testListSinksMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testListFunctionsMissingNamespace() {
+        try {
+            testListSinksMissingArguments(
             tenant,
-            null,
-            "Namespace");
+            null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testListSinksMissingArguments(
-        String tenant,
-        String namespace,
-        String missingFieldName
+            String tenant,
+            String namespace
     ) {
-        Response response = resource.listFunctions(
+        resource.listFunctions(
             tenant,
             namespace
         );
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultSinks() {
+    private List<String> listDefaultSinks() {
         return resource.listFunctions(
             tenant,
             namespace
@@ -1152,9 +1279,9 @@ private Response listDefaultSinks() {
     }
 
     @Test
-    public void testListSinksSuccess() throws Exception {
-        List<String> functions = Lists.newArrayList("test-1", "test-2");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+    public void testListSinksSuccess() {
+        final List<String> functions = Lists.newArrayList("test-1", "test-2");
+        final List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
         functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
                 FunctionDetails.newBuilder().setName("test-1").build()
         ).build());
@@ -1163,15 +1290,14 @@ public void testListSinksSuccess() throws Exception {
         ).build());
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
 
-        Response response = listDefaultSinks();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> sinkList = listDefaultSinks();
+        assertEquals(functions, sinkList);
     }
 
     @Test
-    public void testOnlyGetSinks() throws Exception {
-        List<String> functions = Lists.newArrayList("test-3");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+    public void testOnlyGetSinks() {
+        final List<String> functions = Lists.newArrayList("test-3");
+        final List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
         FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
                 FunctionDetails.newBuilder().setName("test-1").build()).build();
         functionMetaDataList.add(f1);
@@ -1186,25 +1312,30 @@ public void testOnlyGetSinks() throws Exception {
         doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
         doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
-        Response response = listDefaultSinks();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> sinkList = listDefaultSinks();
+        assertEquals(functions, sinkList);
     }
 
-    @Test
-    public void testRegisterFunctionNonexistantNamespace() throws Exception {
-        this.namespaceList.clear();
-        Response response = registerDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
+    public void testRegisterFunctionNonExistingNamespace() throws Exception {
+        try {
+            this.namespaceList.clear();
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionNonexistantTenant() throws Exception {
-        when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
-        Response response = registerDefaultSink();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
+    public void testRegisterFunctionNonExistingTenant() throws Exception {
+        try {
+            when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+            registerDefaultSink();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private SinkConfig createDefaultSinkConfig() {
@@ -1219,6 +1350,7 @@ private SinkConfig createDefaultSinkConfig() {
     }
 
     private FunctionDetails createDefaultFunctionDetails() throws IOException {
-        return SinkConfigUtils.convert(createDefaultSinkConfig(), new SinkConfigUtils.ExtractedSinkDetails(null, null));
+        return SinkConfigUtils.convert(createDefaultSinkConfig(),
+                new SinkConfigUtils.ExtractedSinkDetails(null, null));
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index f7af3cd981..194f624302 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -28,19 +28,28 @@
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Tenants;
+import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
-import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
-import org.apache.pulsar.functions.proto.Function.*;
+import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
+import org.apache.pulsar.functions.proto.Function.ProcessingGuarantees;
+import org.apache.pulsar.functions.proto.Function.SinkSpec;
+import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.functions.utils.SourceConfigUtils;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import org.apache.pulsar.functions.worker.*;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.worker.Utils;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.apache.pulsar.functions.worker.WorkerService;
 import org.apache.pulsar.functions.worker.request.RequestResult;
+import org.apache.pulsar.functions.worker.rest.RestException;
 import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
 import org.apache.pulsar.functions.worker.rest.api.SourceImpl;
 import org.apache.pulsar.io.twitter.TwitterFireHose;
@@ -48,29 +57,32 @@
 import org.mockito.Mockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.testng.Assert;
 import org.testng.IObjectFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.ObjectFactory;
 import org.testng.annotations.Test;
 
 import javax.ws.rs.core.Response;
-import javax.ws.rs.core.Response.Status;
-import java.io.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
 import java.net.URL;
 import java.nio.file.Path;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 
-import static org.mockito.Matchers.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.when;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.doReturn;
 import static org.powermock.api.mockito.PowerMockito.doThrow;
-import static org.powermock.api.mockito.PowerMockito.*;
+import static org.powermock.api.mockito.PowerMockito.mockStatic;
 import static org.testng.Assert.assertEquals;
 
 /**
@@ -165,134 +177,174 @@ public void setup() throws Exception {
     // Register Functions
     //
 
-    @Test
-    public void testRegisterSourceMissingTenant() throws IOException {
-        testRegisterSourceMissingArguments(
-            null,
-            namespace,
-                source,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Tenant is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testRegisterSourceMissingTenant() {
+        try {
+            testRegisterSourceMissingArguments(
+                    null,
+                    namespace,
+                    source,
+                    mockedInputStream,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSourceMissingNamespace() throws IOException {
-        testRegisterSourceMissingArguments(
-            tenant,
-            null,
-                source,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testRegisterSourceMissingNamespace() {
+        try {
+            testRegisterSourceMissingArguments(
+                tenant,
                 null,
-                "Namespace is not provided");
-    }
-
-    @Test
-    public void testRegisterSourceMissingSourceName() throws IOException {
-        testRegisterSourceMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
+                source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
                 outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Source Name is not provided");
+                className,
+                parallelism,
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSourceMissingPackage() throws IOException {
-        testRegisterSourceMissingArguments(
-            tenant,
-            namespace,
-                source,
-            null,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            null,
-            parallelism,
-                null,
-                "Source Package is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Name is not provided")
+    public void testRegisterSourceMissingSourceName() {
+        try {
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    null,
+                    mockedInputStream,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSourceMissingPackageDetails() throws IOException {
-        testRegisterSourceMissingArguments(
-            tenant,
-            namespace,
-                source,
-            mockedInputStream,
-            null,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                null,
-                "Source Package is not provided");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
+    public void testRegisterSourceMissingPackage() {
+        try {
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSourceInvalidJarWithNoSource() throws IOException {
-        FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
-        testRegisterSourceMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
+    public void testRegisterSourceMissingPackageDetails() {
+        try {
+            testRegisterSourceMissingArguments(
                 tenant,
                 namespace,
-                source,
-                inputStream,
+                    source,
+                mockedInputStream,
                 null,
                 outputTopic,
-                outputSerdeClassName,
-                null,
+                    outputSerdeClassName,
+                className,
                 parallelism,
-                null,
-                "Failed to extract source class from archive");
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract source class from archive")
+    public void testRegisterSourceInvalidJarWithNoSource() throws IOException {
+        try {
+            FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    inputStream,
+                    null,
+                    outputTopic,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Topic name cannot be null")
     public void testRegisterSourceNoOutputTopic() throws IOException {
-        FileInputStream inputStream = new FileInputStream(JAR_FILE_PATH);
-        testRegisterSourceMissingArguments(
-                tenant,
-                namespace,
-                source,
-                inputStream,
-                mockedFormData,
-                null,
-                outputSerdeClassName,
-                className,
-                parallelism,
-                null,
-                "Topic name cannot be null");
+        try {
+            FileInputStream inputStream = new FileInputStream(JAR_FILE_PATH);
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    inputStream,
+                    mockedFormData,
+                    null,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterSourceHttpUrl() throws IOException {
-        testRegisterSourceMissingArguments(
-                tenant,
-                namespace,
-                source,
-                null,
-                null,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                parallelism,
-                "http://localhost:1234/test",
-                "Invalid Source Jar");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Invalid Source Jar")
+    public void testRegisterSourceHttpUrl() {
+        try {
+            testRegisterSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    null,
+                    null,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "http://localhost:1234/test"
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testRegisterSourceMissingArguments(
@@ -305,8 +357,7 @@ private void testRegisterSourceMissingArguments(
             String outputSerdeClassName,
             String className,
             Integer parallelism,
-            String pkgUrl,
-            String errorExpected) {
+            String pkgUrl) {
         SourceConfig sourceConfig = new SourceConfig();
         if (tenant != null) {
             sourceConfig.setTenant(tenant);
@@ -330,7 +381,7 @@ private void testRegisterSourceMissingArguments(
             sourceConfig.setParallelism(parallelism);
         }
 
-        Response response = resource.registerFunction(
+        resource.registerFunction(
                 tenant,
                 namespace,
                 function,
@@ -341,13 +392,11 @@ private void testRegisterSourceMissingArguments(
                 new Gson().toJson(sourceConfig),
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(errorExpected).reason);
     }
 
-    private Response registerDefaultSource() throws IOException {
+    private void registerDefaultSource() throws IOException {
         SourceConfig sourceConfig = createDefaultSourceConfig();
-        return resource.registerFunction(
+        resource.registerFunction(
             tenant,
             namespace,
                 source,
@@ -359,34 +408,39 @@ private Response registerDefaultSource() throws IOException {
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source already exists")
     public void testRegisterExistedSource() throws IOException {
-        Configurator.setRootLevel(Level.DEBUG);
+        try {
+            Configurator.setRootLevel(Level.DEBUG);
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-        Response response = registerDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Source " + source + " already exists").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testRegisterSourceUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
-
-        Response response = registerDefaultSource();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
-    @Test
     public void testRegisterSourceSuccess() throws Exception {
         mockStatic(Utils.class);
         doNothing().when(Utils.class);
@@ -403,8 +457,7 @@ public void testRegisterSourceSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSource();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        registerDefaultSource();
     }
 
     @Test
@@ -437,7 +490,7 @@ public void testRegisterSourceConflictingFields() throws Exception {
         sourceConfig.setParallelism(parallelism);
         sourceConfig.setTopicName(outputTopic);
         sourceConfig.setSerdeClassName(outputSerdeClassName);
-        Response response = resource.registerFunction(
+        resource.registerFunction(
                 actualTenant,
                 actualNamespace,
                 actualName,
@@ -447,164 +500,200 @@ public void testRegisterSourceConflictingFields() throws Exception {
                 null,
                 new Gson().toJson(sourceConfig),
                 null);
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
     public void testRegisterSourceFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("source failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
     public void testRegisterSourceInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = registerDefaultSource();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Update Functions
     //
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
     public void testUpdateSourceMissingTenant() throws IOException {
-        testUpdateSourceMissingArguments(
-            null,
-            namespace,
-                source,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Tenant is not provided");
+        try {
+            testUpdateSourceMissingArguments(
+                null,
+                namespace,
+                    source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Tenant is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
     public void testUpdateSourceMissingNamespace() throws IOException {
-        testUpdateSourceMissingArguments(
-            tenant,
-            null,
-                source,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Namespace is not provided");
-    }
-
-    @Test
-    public void testUpdateSourceMissingFunctionName() throws IOException {
-        testUpdateSourceMissingArguments(
-            tenant,
-            namespace,
-            null,
-            mockedInputStream,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            className,
-            parallelism,
-                "Source Name is not provided");
-    }
-
-    @Test
-    public void testUpdateSourceMissingPackage() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-
-        testUpdateSourceMissingArguments(
-            tenant,
-            namespace,
-                source,
+        try {
+            testUpdateSourceMissingArguments(
+                tenant,
                 null,
-            mockedFormData,
-            outputTopic,
-                outputSerdeClassName,
-            null,
-            parallelism,
-                "Update contains no change");
+                    source,
+                mockedInputStream,
+                mockedFormData,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
+                parallelism,
+                    "Namespace is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testUpdateSourceMissingTopicName() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-
-        testUpdateSourceMissingArguments(
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Name is not provided")
+    public void testUpdateSourceMissingFunctionName() throws IOException {
+        try {
+            testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
-                source,
                 null,
+                mockedInputStream,
                 mockedFormData,
-                null,
-                outputSerdeClassName,
-                null,
+                outputTopic,
+                    outputSerdeClassName,
+                className,
                 parallelism,
-                "Update contains no change");
+                    "Source Name is not provided");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testUpdateSourceNegativeParallelism() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateSourceMissingPackage() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
-        testUpdateSourceMissingArguments(
+            testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
-                source,
-                null,
+                    source,
+                    null,
                 mockedFormData,
                 outputTopic,
-                outputSerdeClassName,
-                className,
-                -2,
-                "Source parallelism should positive number");
+                    outputSerdeClassName,
+                null,
+                parallelism,
+                    "Update contains no change");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Update contains no change")
+    public void testUpdateSourceMissingTopicName() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+
+            testUpdateSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    null,
+                    mockedFormData,
+                    null,
+                    outputSerdeClassName,
+                    null,
+                    parallelism,
+                    "Update contains no change");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
+    }
+
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
+    public void testUpdateSourceNegativeParallelism() throws IOException {
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+
+            testUpdateSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    null,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    -2,
+                    "Source parallelism should positive number");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     @Test
     public void testUpdateSourceChangedParallelism() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
 
-        testUpdateSourceMissingArguments(
+            testUpdateSourceMissingArguments(
                 tenant,
                 namespace,
                 source,
@@ -615,44 +704,58 @@ public void testUpdateSourceChangedParallelism() throws IOException {
                 className,
                 parallelism + 1,
                 null);
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Destination topics differ")
     public void testUpdateSourceChangedTopic() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-
-        testUpdateSourceMissingArguments(
-                tenant,
-                namespace,
-                source,
-                null,
-                mockedFormData,
-                "DifferentTopic",
-                outputSerdeClassName,
-                className,
-                parallelism,
-                "Destination topics differ");
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+
+            testUpdateSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    null,
+                    mockedFormData,
+                    "DifferentTopic",
+                    outputSerdeClassName,
+                    className,
+                    parallelism,
+                    "Destination topics differ");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source parallelism should positive number")
     public void testUpdateSourceZeroParallelism() throws IOException {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
-
-        testUpdateSourceMissingArguments(
-                tenant,
-                namespace,
-                source,
-                mockedInputStream,
-                mockedFormData,
-                outputTopic,
-                outputSerdeClassName,
-                className,
-                0,
-                "Source parallelism should positive number");
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
+
+            testUpdateSourceMissingArguments(
+                    tenant,
+                    namespace,
+                    source,
+                    mockedInputStream,
+                    mockedFormData,
+                    outputTopic,
+                    outputSerdeClassName,
+                    className,
+                    0,
+                    "Source parallelism should positive number");
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testUpdateSourceMissingArguments(
@@ -714,7 +817,7 @@ private void testUpdateSourceMissingArguments(
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
         }
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
             function,
@@ -725,15 +828,9 @@ private void testUpdateSourceMissingArguments(
             new Gson().toJson(sourceConfig),
                 null);
 
-        if (expectedError == null) {
-            assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        } else {
-            assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-            Assert.assertEquals(((ErrorData) response.getEntity()).reason, new ErrorData(expectedError).reason);
-        }
     }
 
-    private Response updateDefaultSource() throws IOException {
+    private void updateDefaultSource() throws IOException {
         SourceConfig sourceConfig = new SourceConfig();
         sourceConfig.setTenant(tenant);
         sourceConfig.setNamespace(namespace);
@@ -757,8 +854,7 @@ private Response updateDefaultSource() throws IOException {
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
-
-        return resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
                 source,
@@ -770,29 +866,33 @@ private Response updateDefaultSource() throws IOException {
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source doesn't exist")
     public void testUpdateNotExistedSource() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
-
-        Response response = updateDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+            updateDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "upload failure")
     public void testUpdateSourceUploadFailure() throws Exception {
-        mockStatic(Utils.class);
-        doThrow(new IOException("upload failure")).when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
-
-        Response response = updateDefaultSource();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("upload failure").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            mockStatic(Utils.class);
+            doThrow(new IOException("upload failure")).when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+            updateDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     @Test
@@ -812,8 +912,7 @@ public void testUpdateSourceSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSource();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
+        updateDefaultSource();
     }
 
     @Test
@@ -853,7 +952,7 @@ public void testUpdateSourceWithUrl() throws IOException {
             CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
             when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = resource.updateFunction(
+        resource.updateFunction(
             tenant,
             namespace,
                 source,
@@ -864,117 +963,136 @@ public void testUpdateSourceWithUrl() throws IOException {
             new Gson().toJson(sourceConfig),
                 null);
 
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to register")
     public void testUpdateSourceFailure() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
 
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to register");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                    .setSuccess(false)
+                    .setMessage("source failed to register");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "java.io.IOException: Function registration interrupted")
     public void testUpdateSourceInterrupted() throws Exception {
-        mockStatic(Utils.class);
-        doNothing().when(Utils.class);
-        Utils.uploadFileToBookkeeper(
-                anyString(),
-                any(File.class),
-                any(Namespace.class));
-
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
-
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function registeration interrupted"));
-        when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+        try {
+            mockStatic(Utils.class);
+            doNothing().when(Utils.class);
+            Utils.uploadFileToBookkeeper(
+                    anyString(),
+                    any(File.class),
+                    any(Namespace.class));
+
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function registration interrupted"));
+            when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
 
-        Response response = updateDefaultSource();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function registeration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            updateDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // deregister source
     //
 
-    @Test
-    public void testDeregisterSourceMissingTenant() throws Exception {
-        testDeregisterSourceMissingArguments(
-            null,
-            namespace,
-                source,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testDeregisterSourceMissingTenant() {
+        try {
+            testDeregisterSourceMissingArguments(
+                null,
+                namespace,
+                    source
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSourceMissingNamespace() throws Exception {
-        testDeregisterSourceMissingArguments(
-            tenant,
-            null,
-                source,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testDeregisterSourceMissingNamespace() {
+        try {
+            testDeregisterSourceMissingArguments(
+                tenant,
+                null,
+                source
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSourceMissingFunctionName() throws Exception {
-        testDeregisterSourceMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Source Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Name is not provided")
+    public void testDeregisterSourceMissingFunctionName() {
+        try {
+            testDeregisterSourceMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testDeregisterSourceMissingArguments(
-        String tenant,
-        String namespace,
-        String function,
-        String missingFieldName
+            String tenant,
+            String namespace,
+            String function
     ) {
-        Response response = resource.deregisterFunction(
+        resource.deregisterFunction(
             tenant,
             namespace,
             function,
                 null);
 
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response deregisterDefaultSource() {
-        return resource.deregisterFunction(
+    private void deregisterDefaultSource() {
+        resource.deregisterFunction(
             tenant,
             namespace,
                 source,
                 null);
     }
 
-    @Test
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp= "Source test-source doesn't exist")
     public void testDeregisterNotExistedSource() {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
-
-        Response response = deregisterDefaultSource();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+            deregisterDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testDeregisterSourceSuccess() throws Exception {
+    public void testDeregisterSourceSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
         RequestResult rr = new RequestResult()
@@ -983,105 +1101,122 @@ public void testDeregisterSourceSuccess() throws Exception {
         CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
         when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultSource();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(rr.toJson(), response.getEntity());
+        deregisterDefaultSource();
     }
 
-    @Test
-    public void testDeregisterSourceFailure() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "source failed to deregister")
+    public void testDeregisterSourceFailure() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-        RequestResult rr = new RequestResult()
-            .setSuccess(false)
-            .setMessage("source failed to deregister");
-        CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+            RequestResult rr = new RequestResult()
+                .setSuccess(false)
+                .setMessage("source failed to deregister");
+            CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(rr.getMessage()).reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testDeregisterSourceInterrupted() throws Exception {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function deregistration interrupted")
+    public void testDeregisterSourceInterrupted() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
-        CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
-            new IOException("Function deregisteration interrupted"));
-        when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
+            CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
+                new IOException("Function deregistration interrupted"));
+            when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(requestResult);
 
-        Response response = deregisterDefaultSource();
-        assertEquals(Status.INTERNAL_SERVER_ERROR.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Function deregisteration interrupted").reason, ((ErrorData) response.getEntity()).reason);
+            deregisterDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.INTERNAL_SERVER_ERROR);
+            throw re;
+        }
     }
 
     //
     // Get Source Info
     //
 
-    @Test
-    public void testGetSourceMissingTenant() throws Exception {
-        testGetSourceMissingArguments(
-            null,
-            namespace,
-                source,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testGetSourceMissingTenant() {
+        try {
+            testGetSourceMissingArguments(
+                    null,
+                    namespace,
+                    source
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetSourceMissingNamespace() throws Exception {
-        testGetSourceMissingArguments(
-            tenant,
-            null,
-                source,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testGetSourceMissingNamespace() {
+        try {
+            testGetSourceMissingArguments(
+                tenant,
+                null,
+                    source
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testGetSourceMissingFunctionName() throws Exception {
-        testGetSourceMissingArguments(
-            tenant,
-            namespace,
-            null,
-            "Source Name");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Name is not provided")
+    public void testGetSourceMissingFunctionName() {
+        try {
+            testGetSourceMissingArguments(
+                tenant,
+                namespace,
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testGetSourceMissingArguments(
-        String tenant,
-        String namespace,
-        String source,
-        String missingFieldName
-    ) throws IOException {
-        Response response = resource.getFunctionInfo(
+            String tenant,
+            String namespace,
+            String source
+    ) {
+        resource.getFunctionInfo(
             tenant,
             namespace,
             source
         );
-
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response getDefaultSourceInfo() throws IOException {
-        return resource.getFunctionInfo(
+    private SourceConfig getDefaultSourceInfo() {
+        return resource.getSourceInfo(
             tenant,
             namespace,
                 source
         );
     }
 
-    @Test
-    public void testGetNotExistedSource() throws IOException {
-        when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
-
-        Response response = getDefaultSourceInfo();
-        assertEquals(Status.NOT_FOUND.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Source " + source + " doesn't exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source test-source doesn't exist")
+    public void testGetNotExistedSource() {
+        try {
+            when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(false);
+            getDefaultSourceInfo();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.NOT_FOUND);
+            throw re;
+        }
     }
 
     @Test
-    public void testGetSourceSuccess() throws Exception {
+    public void testGetSourceSuccess() {
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(source))).thenReturn(true);
 
         SourceSpec sourceSpec = SourceSpec.newBuilder().setBuiltin("jdbc").build();
@@ -1107,57 +1242,60 @@ public void testGetSourceSuccess() throws Exception {
             .build();
         when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(source))).thenReturn(metaData);
 
-        Response response = getDefaultSourceInfo();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(
-            new Gson().toJson(SourceConfigUtils.convertFromDetails(functionDetails)),
-            response.getEntity());
+        SourceConfig config = getDefaultSourceInfo();
+        assertEquals(SourceConfigUtils.convertFromDetails(functionDetails), config);
     }
 
     //
     // List Sources
     //
 
-    @Test
-    public void testListSourcesMissingTenant() throws Exception {
-        testListSourcesMissingArguments(
-            null,
-            namespace,
-            "Tenant");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
+    public void testListSourcesMissingTenant() {
+        try {
+            testListSourcesMissingArguments(
+                null,
+                namespace
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testListSourcesMissingNamespace() throws Exception {
-        testListSourcesMissingArguments(
-            tenant,
-            null,
-            "Namespace");
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
+    public void testListSourcesMissingNamespace() {
+        try {
+            testListSourcesMissingArguments(
+                tenant,
+                null
+            );
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private void testListSourcesMissingArguments(
-        String tenant,
-        String namespace,
-        String missingFieldName
+            String tenant,
+            String namespace
     ) {
-        Response response = resource.listFunctions(
+        resource.listFunctions(
             tenant,
             namespace
         );
-
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData(missingFieldName + " is not provided").reason, ((ErrorData) response.getEntity()).reason);
     }
 
-    private Response listDefaultSources() {
+    private List<String> listDefaultSources() {
         return resource.listFunctions(
             tenant,
             namespace);
     }
 
     @Test
-    public void testListSourcesSuccess() throws Exception {
-        List<String> functions = Lists.newArrayList("test-1", "test-2");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+    public void testListSourcesSuccess() {
+        final List<String> functions = Lists.newArrayList("test-1", "test-2");
+        final List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
         functionMetaDataList.add(FunctionMetaData.newBuilder().setFunctionDetails(
                 FunctionDetails.newBuilder().setName("test-1").build()
         ).build());
@@ -1166,15 +1304,14 @@ public void testListSourcesSuccess() throws Exception {
         ).build());
         when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
 
-        Response response = listDefaultSources();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> sourceList = listDefaultSources();
+        assertEquals(functions, sourceList);
     }
 
     @Test
-    public void testOnlyGetSources() throws Exception {
-        List<String> functions = Lists.newArrayList("test-1");
-        List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
+    public void testOnlyGetSources() {
+        final List<String> functions = Lists.newArrayList("test-1");
+        final List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
         FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
                 FunctionDetails.newBuilder().setName("test-1").build()).build();
         functionMetaDataList.add(f1);
@@ -1189,25 +1326,30 @@ public void testOnlyGetSources() throws Exception {
         doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
         doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
 
-        Response response = listDefaultSources();
-        assertEquals(Status.OK.getStatusCode(), response.getStatus());
-        assertEquals(new Gson().toJson(functions), response.getEntity());
+        List<String> sourceList = listDefaultSources();
+        assertEquals(functions, sourceList);
     }
 
-    @Test
-    public void testRegisterFunctionNonexistantNamespace() throws Exception {
-        this.namespaceList.clear();
-        Response response = registerDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Namespace does not exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
+    public void testRegisterFunctionNonExistingNamespace() throws Exception {
+        try {
+            this.namespaceList.clear();
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
-    @Test
-    public void testRegisterFunctionNonexistantTenant() throws Exception {
-        when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
-        Response response = registerDefaultSource();
-        assertEquals(Status.BAD_REQUEST.getStatusCode(), response.getStatus());
-        assertEquals(new ErrorData("Tenant does not exist").reason, ((ErrorData) response.getEntity()).reason);
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
+    public void testRegisterFunctionNonExistingTenant() throws Exception {
+        try {
+            when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
+            registerDefaultSource();
+        } catch (RestException re){
+            assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+            throw re;
+        }
     }
 
     private SourceConfig createDefaultSourceConfig() {
@@ -1223,6 +1365,7 @@ private SourceConfig createDefaultSourceConfig() {
     }
 
     private FunctionDetails createDefaultFunctionDetails() {
-        return SourceConfigUtils.convert(createDefaultSourceConfig(), new SourceConfigUtils.ExtractedSourceDetails(null, null));
+        return SourceConfigUtils.convert(createDefaultSourceConfig(),
+                new SourceConfigUtils.ExtractedSourceDetails(null, null));
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services