You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2019/06/17 11:03:18 UTC

[pulsar] branch master updated: [improve doc] Update source rest api doc (#4521)

This is an automated email from the ASF dual-hosted git repository.

zhaijia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4f2bac0  [improve doc] Update source rest api doc (#4521)
4f2bac0 is described below

commit 4f2bac0767819937d83833780fe7cb309b7d2a08
Author: 冉小龙 <ra...@gmail.com>
AuthorDate: Mon Jun 17 19:03:11 2019 +0800

    [improve doc] Update source rest api doc (#4521)
    
    * [improve doc] Update source rest api doc
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
    
    * fix comments
    
    Signed-off-by: xiaolong.ran <ra...@gmail.com>
---
 .../pulsar/broker/admin/impl/SourcesBase.java      | 346 ++++++++++++++++-----
 1 file changed, 264 insertions(+), 82 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index e65eda7..8d2963b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.functions.UpdateOptions;
@@ -63,20 +61,78 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @POST
     @ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
-            @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+            @ApiResponse(code = 200, message = "Pulsar Function successfully created"),
+            @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerSource(final @PathParam("tenant") String tenant,
-                               final @PathParam("namespace") String namespace,
-                               final @PathParam("sourceName") String sourceName,
-                               final @FormDataParam("data") InputStream uploadedInputStream,
-                               final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                               final @FormDataParam("url") String functionPkgUrl,
-                               final @FormDataParam("sourceConfig") String sourceConfigJson) {
+    public void registerSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of source")
+            final @PathParam("sourceName") String sourceName,
+            final @FormDataParam("data") InputStream uploadedInputStream,
+            final @FormDataParam("data") FormDataContentDisposition fileDetail,
+            final @FormDataParam("url") String functionPkgUrl,
+            @ApiParam(
+                    value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here.  \n" +
+                            "tenant  \n" +
+                            "  The tenant of source.  \n" +
+                            "namespace  \n" +
+                            "  The namespace of source.  \n" +
+                            "name  \n" +
+                            "  The name of source.  \n" +
+                            "classname  \n" +
+                            "  The source's class name if archive is file-url-path (file://).  \n" +
+                            "topicName  \n" +
+                            "  The Pulsar topic to which data is sent.  \n" +
+                            "serdeClassName  \n" +
+                            "  The SerDe classname for the source.  \n" +
+                            "schemaType  \n" +
+                            "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  " +
+                            "  custom Schema class name to be used to encode messages emitted from the source  \n" +
+                            "configs  \n" +
+                            "  Source config key/values  \n" +
+                            "secrets  \n" +
+                            "  This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
+                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
+                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
+                            "parallelism  \n" +
+                            "  The source's parallelism factor (i.e. the number of source instances to run).  \n" +
+                            "processingGuarantees  \n" +
+                            "  The processing guarantees (aka delivery semantics) applied to the source  " +
+                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
+                            "resources  \n" +
+                            "  The size of the system resources allowed by the source runtime. The resources include: cpu, ram, disk.  \n" +
+                            "archive  \n" +
+                            "  The path to the NAR archive for the Source. It also supports url-path " +
+                            "  [http/https/file (file protocol assumes that file already exists on worker host)] " +
+                            "  from which worker can download the package.  \n" +
+                            "runtimeFlags  \n" +
+                            "  Any flags that you want to pass to the runtime.  \n",
+                    examples = @Example(
+                            value = @ExampleProperty(
+                                    mediaType = MediaType.APPLICATION_JSON,
+                                    value = "{\n"
+                                            + "  \"tenant\": public\n"
+                                            + "  \"namespace\": default\n"
+                                            + "  \"name\": pulsar-io-mysql\n"
+                                            + "  \"className\": TestSourceMysql\n"
+                                            + "  \"topicName\": pulsar-io-mysql\n"
+                                            + "  \"parallelism\": 1\n"
+                                            + "  \"archive\": /connectors/pulsar-io-mysql-0.0.1.nar\n"
+                                            + "  \"schemaType\": avro\n"
+                                            + "}\n"
+                            )
+                    )
+            )
+            final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
             functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
@@ -86,19 +142,79 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode")
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
-            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+            @ApiResponse(code = 400, message = "Invalid request (Function already exists or Tenant, Namespace or Name is not provided, etc.)"),
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 200, message = "Pulsar Function successfully updated"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateSource(final @PathParam("tenant") String tenant,
-                             final @PathParam("namespace") String namespace,
-                             final @PathParam("sourceName") String sourceName,
-                             final @FormDataParam("data") InputStream uploadedInputStream,
-                             final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                             final @FormDataParam("url") String functionPkgUrl,
-                             final @FormDataParam("sourceConfig") String sourceConfigJson,
-                             final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
+    public void updateSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of source")
+            final @PathParam("sourceName") String sourceName,
+            final @FormDataParam("data") InputStream uploadedInputStream,
+            final @FormDataParam("data") FormDataContentDisposition fileDetail,
+            final @FormDataParam("url") String functionPkgUrl,
+            @ApiParam(
+                    value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here.  \n" +
+                            "tenant  \n" +
+                            "  The tenant of source.  \n" +
+                            "namespace  \n" +
+                            "  The namespace of source.  \n" +
+                            "name  \n" +
+                            "  The name of source.  \n" +
+                            "classname  \n" +
+                            "  The source's class name if archive is file-url-path (file://).  \n" +
+                            "topicName  \n" +
+                            "  The Pulsar topic to which data is sent.  \n" +
+                            "serdeClassName  \n" +
+                            "  The SerDe classname for the source.  \n" +
+                            "schemaType  \n" +
+                            "  The schema type (either a builtin schema like 'avro', 'json', etc.. or  " +
+                            "  custom Schema class name to be used to encode messages emitted from the source  \n" +
+                            "configs  \n" +
+                            "  Source config key/values  \n" +
+                            "secrets  \n" +
+                            "  This is a map of secretName(that is how the secret is going to be accessed in the function via context) to an object that" +
+                            "  encapsulates how the secret is fetched by the underlying secrets provider. The type of an value here can be found by the" +
+                            "  SecretProviderConfigurator.getSecretObjectType() method. \n" +
+                            "parallelism  \n" +
+                            "  The source's parallelism factor (i.e. the number of source instances to run).  \n" +
+                            "processingGuarantees  \n" +
+                            "  The processing guarantees (aka delivery semantics) applied to the source  " +
+                            "  Possible Values: [ATLEAST_ONCE, ATMOST_ONCE, EFFECTIVELY_ONCE]  \n" +
+                            "resources  \n" +
+                            "  The size of the system resources allowed by the source runtime. The resources include: cpu, ram, disk.  \n" +
+                            "archive  \n" +
+                            "  The path to the NAR archive for the Source. It also supports url-path " +
+                            "  [http/https/file (file protocol assumes that file already exists on worker host)] " +
+                            "  from which worker can download the package.  \n" +
+                            "runtimeFlags  \n" +
+                            "  Any flags that you want to pass to the runtime.  \n",
+                    examples = @Example(
+                            value = @ExampleProperty(
+                                    mediaType = MediaType.APPLICATION_JSON,
+                                    value = "{\n"
+                                            + "  \"tenant\": public\n"
+                                            + "  \"namespace\": default\n"
+                                            + "  \"name\": pulsar-io-mysql\n"
+                                            + "  \"className\": TestSourceMysql\n"
+                                            + "  \"topicName\": pulsar-io-mysql\n"
+                                            + "  \"parallelism\": 1\n"
+                                            + "  \"archive\": /connectors/pulsar-io-mysql-0.0.1.nar\n"
+                                            + "  \"schemaType\": avro\n"
+                                            + "}\n"
+                            )
+                    )
+            )
+            final @FormDataParam("sourceConfig") String sourceConfigJson,
+            final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
 
         source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
             functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
@@ -108,16 +224,22 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @DELETE
     @ApiOperation(value = "Deletes a Pulsar Source currently running in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function doesn't exist"),
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
             @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 200, message = "The function was successfully deleted")
+            @ApiResponse(code = 200, message = "The function was successfully deleted"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public void deregisterSource(final @PathParam("tenant") String tenant,
-                                       final @PathParam("namespace") String namespace,
-                                       final @PathParam("sourceName") String sourceName) {
+    public void deregisterSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) {
         source.deregisterFunction(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
@@ -127,15 +249,18 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             response = SourceConfig.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sourceName}")
-    public SourceConfig getSourceInfo(final @PathParam("tenant") String tenant,
-                                      final @PathParam("namespace") String namespace,
-                                      final @PathParam("sourceName") String sourceName) throws IOException {
+    public SourceConfig getSourceInfo(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) throws IOException {
         return source.getSourceInfo(tenant, namespace, sourceName);
     }
 
@@ -145,16 +270,19 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             response = SourceStatus.SourceInstanceStatus.SourceInstanceStatusData.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The source doesn't exist")
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
     public SourceStatus.SourceInstanceStatus.SourceInstanceStatusData getSourceInstanceStatus(
+            @ApiParam(value = "The name of tenant")
             final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
             final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
             final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The source instanceId (if instance-id is not provided, the stats of all instances is returned).")
             final @PathParam("instanceId") String instanceId) throws IOException {
         return source.getSourceInstanceStatus(
             tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
@@ -166,15 +294,18 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
             response = SourceStatus.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The source doesn't exist")
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sourceName}/status")
-    public SourceStatus getSourceStatus(final @PathParam("tenant") String tenant,
-                                        final @PathParam("namespace") String namespace,
-                                        final @PathParam("sourceName") String sourceName) throws IOException {
+    public SourceStatus getSourceStatus(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) throws IOException {
         return source.getSourceStatus(tenant, namespace, sourceName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
@@ -186,12 +317,17 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     )
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 500, message = "Internal Server Error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Consumes(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}")
-    public List<String> listSources(final @PathParam("tenant") String tenant,
-                                    final @PathParam("namespace") String namespace) {
+    public List<String> listSources(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace) {
         return source.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
 
@@ -199,14 +335,22 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Restart source instance", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSource(final @PathParam("tenant") String tenant,
-                              final @PathParam("namespace") String namespace,
-                              final @PathParam("sourceName") String sourceName,
-                              final @PathParam("instanceId") String instanceId) {
+    public void restartSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The source instanceId (if instance-id is not provided, the stats of all instances is returned).")
+            final @PathParam("instanceId") String instanceId) {
         source.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
@@ -214,13 +358,20 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Restart all source instances", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSource(final @PathParam("tenant") String tenant,
-                              final @PathParam("namespace") String namespace,
-                              final @PathParam("sourceName") String sourceName) {
+    public void restartSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) {
         source.restartFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
@@ -228,14 +379,22 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Stop source instance", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSource(final @PathParam("tenant") String tenant,
-                           final @PathParam("namespace") String namespace,
-                           final @PathParam("sourceName") String sourceName,
-                           final @PathParam("instanceId") String instanceId) {
+    public void stopSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The source instanceId (if instance-id is not provided, the stats of all instances is returned).")
+            final @PathParam("instanceId") String instanceId) {
         source.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
@@ -243,13 +402,20 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Stop all source instances", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSource(final @PathParam("tenant") String tenant,
-                           final @PathParam("namespace") String namespace,
-                           final @PathParam("sourceName") String sourceName) {
+    public void stopSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) {
         source.stopFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
@@ -257,14 +423,22 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Start source instance", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/start")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void startSource(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sourceName") String sourceName,
-                            final @PathParam("instanceId") String instanceId) {
+    public void startSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName,
+            @ApiParam(value = "The source instanceId (if instance-id is not provided, the stats of all instances is returned).")
+            final @PathParam("instanceId") String instanceId) {
         source.startFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
 
@@ -272,13 +446,20 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiOperation(value = "Start all source instances", response = Void.class)
     @ApiResponses(value = {
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error") })
+            @ApiResponse(code = 401, message = "Client is not authorize to perform operation"),
+            @ApiResponse(code = 404, message = "Not Found(The source doesn't exist)"),
+            @ApiResponse(code = 500, message = "Internal server error"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
+    })
     @Path("/{tenant}/{namespace}/{sourceName}/start")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void startSource(final @PathParam("tenant") String tenant,
-                            final @PathParam("namespace") String namespace,
-                            final @PathParam("sourceName") String sourceName) {
+    public void startSource(
+            @ApiParam(value = "The name of tenant")
+            final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The name of namespace")
+            final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The name of name")
+            final @PathParam("sourceName") String sourceName) {
         source.startFunctionInstances(tenant, namespace, sourceName, clientAppId(), clientAuthData());
     }
 
@@ -290,7 +471,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
     @ApiResponses(value = {
             @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 408, message = "Request timeout")
+            @ApiResponse(code = 408, message = "Request timeout"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/builtinsources")