You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/17 19:35:10 UTC

[pulsar] branch master updated: [REST API Doc]Sink API refinement (#4520)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d2546f0  [REST API Doc]Sink API refinement (#4520)
d2546f0 is described below

commit d2546f0c38e378ddf668a27330d7f9c5412a04b7
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Tue Jun 18 03:35:02 2019 +0800

    [REST API Doc]Sink API refinement (#4520)
    
    * revisit documentation for pulsar admin api
    
    * format
    
    * update json body
    
    * sinkConfig as doc base
---
 .../apache/pulsar/broker/admin/impl/SinksBase.java | 294 ++++++++++++++++-----
 pulsar-common/pom.xml                              |   5 +
 .../pulsar/common/functions/UpdateOptions.java     |   6 +
 3 files changed, 246 insertions(+), 59 deletions(-)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 50839d8..c527bd3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -18,9 +18,7 @@
  */
 package org.apache.pulsar.broker.admin.impl;
 
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.broker.admin.AdminResource;
 import org.apache.pulsar.common.functions.UpdateOptions;
@@ -63,19 +61,81 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
-            @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+            @ApiResponse(code = 400, message = "Invalid request (sink already exists, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Sink successfully created"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"),
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void registerSink(final @PathParam("tenant") String tenant,
+    public void registerSink(@ApiParam(value = "The sink's tenant")
+                             final @PathParam("tenant") String tenant,
+                             @ApiParam(value = "The sink's namespace")
                              final @PathParam("namespace") String namespace,
+                             @ApiParam(value = "The sink's name")
                              final @PathParam("sinkName") String sinkName,
                              final @FormDataParam("data") InputStream uploadedInputStream,
                              final @FormDataParam("data") FormDataContentDisposition fileDetail,
                              final @FormDataParam("url") String functionPkgUrl,
+                             @ApiParam(
+                                 value =
+                                     "A JSON value presenting a sink config playload. All available configuration options are:  \n" +
+                                     "classname  \n" +
+                                     "   The sink's class name if archive is file-url-path (file://)  \n" +
+                                     "sourceSubscriptionName  \n" +
+                                     "   Pulsar source subscription name if user wants a specific  \n" +
+                                     "   subscription-name for input-topic consumer  \n" +
+                                     "inputs  \n" +
+                                     "   The sink's input topic or topics (specified as a JSON array)  \n" +
+                                     "topicsPattern  \n" +
+                                     "   TopicsPattern to consume from list of topics under a namespace that " +
+                                     "   match the pattern. [input] and [topicsPattern] are mutually " +
+                                     "   exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
+                                     "   (supported for java fun only)" +
+                                     "topicToSerdeClassName  \n" +
+                                     "   The map of input topics to SerDe class names (specified as a JSON object)  \n" +
+                                     "topicToSchemaType  \n" +
+                                     "   The map of input topics to Schema types or class names (specified as a JSON object)  \n" +
+                                     "inputSpecs  \n" +
+                                     "   The map of input topics to its consumer configuration, each configuration has schema of " +
+                                     "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
+                                     "configs  \n" +
+                                     "   The map of configs (specified as a JSON object)  \n" +
+                                     "secrets  \n" +
+                                     "   a map of secretName(aka how the secret is going to be \n" +
+                                     "   accessed in the function via context) to an object that \n" +
+                                     "   encapsulates how the secret is fetched by the underlying \n" +
+                                     "   secrets provider. The type of an value here can be found by the \n" +
+                                     "   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \n" +
+                                     "parallelism  \n" +
+                                     "   The sink's parallelism factor (i.e. the number of sink instances to run \n" +
+                                     "processingGuarantees  \n" +
+                                     "   The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \n" +
+                                     "retainOrdering  \n" +
+                                     "   Boolean denotes whether sink consumes and sinks messages in order  \n" +
+                                     "resources  \n" +
+                                     "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime)  \n" +
+                                     "autoAck  \n" +
+                                     "   Boolean denotes whether or not the framework will automatically acknowledge messages  \n" +
+                                     "timeoutMs  \n" +
+                                     "   Long denotes the message timeout in milliseconds  \n" +
+                                     "cleanupSubscription  \n" +
+                                     "   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \n" +
+                                     "runtimeFlags  \n" +
+                                     "   Any flags that you want to pass to the runtime as a single string  \n",
+                                 examples = @Example(
+                                     value = @ExampleProperty(
+                                         mediaType = MediaType.APPLICATION_JSON,
+                                         value = "{  \n" +
+                                             "\t\"classname\": \"org.example.MySinkTest\",\n" +
+                                             "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
+                                             "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
+                                             "\t\"parallelism\": 10\n" +
+                                             "}"
+                                     )
+                                 )
+                             )
                              final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
@@ -85,19 +145,84 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @PUT
     @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
-            @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+            @ApiResponse(code = 400, message = "Invalid request (sink doesn't exist, update contains no change, etc.)"),
+            @ApiResponse(code = 200, message = "Pulsar Sink successfully updated"),
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
-    public void updateSink(final @PathParam("tenant") String tenant,
+    public void updateSink(@ApiParam(value = "The sink's tenant")
+                           final @PathParam("tenant") String tenant,
+                           @ApiParam(value = "The sink's namespace")
                            final @PathParam("namespace") String namespace,
+                           @ApiParam(value = "The sink's name")
                            final @PathParam("sinkName") String sinkName,
                            final @FormDataParam("data") InputStream uploadedInputStream,
                            final @FormDataParam("data") FormDataContentDisposition fileDetail,
                            final @FormDataParam("url") String functionPkgUrl,
+                           @ApiParam(
+                               value =
+                                   "A JSON value presenting a sink config playload. All available configuration options are:  \n" +
+                                       "classname  \n" +
+                                       "   The sink's class name if archive is file-url-path (file://)  \n" +
+                                       "sourceSubscriptionName  \n" +
+                                       "   Pulsar source subscription name if user wants a specific  \n" +
+                                       "   subscription-name for input-topic consumer  \n" +
+                                       "inputs  \n" +
+                                       "   The sink's input topic or topics (specified as a JSON array)  \n" +
+                                       "topicsPattern  \n" +
+                                       "   TopicsPattern to consume from list of topics under a namespace that " +
+                                       "   match the pattern. [input] and [topicsPattern] are mutually " +
+                                       "   exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
+                                       "   (supported for java fun only)" +
+                                       "topicToSerdeClassName  \n" +
+                                       "   The map of input topics to SerDe class names (specified as a JSON object)  \n" +
+                                       "topicToSchemaType  \n" +
+                                       "   The map of input topics to Schema types or class names (specified as a JSON object)  \n" +
+                                       "inputSpecs  \n" +
+                                       "   The map of input topics to its consumer configuration, each configuration has schema of " +
+                                       "   {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5}  \n" +
+                                       "configs  \n" +
+                                       "   The map of configs (specified as a JSON object)  \n" +
+                                       "secrets  \n" +
+                                       "   a map of secretName(aka how the secret is going to be \n" +
+                                       "   accessed in the function via context) to an object that \n" +
+                                       "   encapsulates how the secret is fetched by the underlying \n" +
+                                       "   secrets provider. The type of an value here can be found by the \n" +
+                                       "   SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object)  \n" +
+                                       "parallelism  \n" +
+                                       "   The sink's parallelism factor (i.e. the number of sink instances to run \n" +
+                                       "processingGuarantees  \n" +
+                                       "   The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\"  \n" +
+                                       "retainOrdering  \n" +
+                                       "   Boolean denotes whether sink consumes and sinks messages in order  \n" +
+                                       "resources  \n" +
+                                       "   {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime)  \n" +
+                                       "autoAck  \n" +
+                                       "   Boolean denotes whether or not the framework will automatically acknowledge messages  \n" +
+                                       "timeoutMs  \n" +
+                                       "   Long denotes the message timeout in milliseconds  \n" +
+                                       "cleanupSubscription  \n" +
+                                       "   Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted  \n" +
+                                       "runtimeFlags  \n" +
+                                       "   Any flags that you want to pass to the runtime as a single string  \n",
+                               examples = @Example(
+                                   value = @ExampleProperty(
+                                       mediaType = MediaType.APPLICATION_JSON,
+                                       value = "{  \n" +
+                                           "\t\"classname\": \"org.example.SinkStressTest\",  \n" +
+                                               "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
+                                               "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
+                                               "\t\"parallelism\": 5\n" +
+                                               "}"
+                                   )
+                               )
+                           )
                            final @FormDataParam("sinkConfig") String sinkConfigJson,
+                           @ApiParam()
                            final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
 
          sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
@@ -109,15 +234,20 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @DELETE
     @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode")
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function doesn't exist"),
-            @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 200, message = "The function was successfully deleted")
+            @ApiResponse(code = 400, message = "Invalid deregister request"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 200, message = "The sink was successfully deleted"),
+            @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"),
+            @ApiResponse(code = 408, message = "Got InterruptedException while deregistering the sink"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public void deregisterSink(final @PathParam("tenant") String tenant,
+    public void deregisterSink(@ApiParam(value = "The sink's tenant")
+                               final @PathParam("tenant") String tenant,
+                               @ApiParam(value = "The sink's namespace")
                                final @PathParam("namespace") String namespace,
+                               @ApiParam(value = "The sink's name")
                                final @PathParam("sinkName") String sinkName) {
         sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
@@ -128,14 +258,16 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             response = SinkConfig.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
             @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 408, message = "Request timeout"),
-            @ApiResponse(code = 404, message = "The function doesn't exist")
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}")
-    public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant,
+    public SinkConfig getSinkInfo(@ApiParam(value = "The sink's tenant")
+                                  final @PathParam("tenant") String tenant,
+                                  @ApiParam(value = "The sink's namespace")
                                   final @PathParam("namespace") String namespace,
+                                  @ApiParam(value = "The sink's name")
                                   final @PathParam("sinkName") String sinkName) throws IOException {
         return sink.getSinkInfo(tenant, namespace, sinkName);
     }
@@ -146,16 +278,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The sink doesn't exist")
+            @ApiResponse(code = 400, message = "The sink instance does not exist"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal Server Error (got exception while getting status, etc.)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
     public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(
+            @ApiParam(value = "The sink's tenant")
             final @PathParam("tenant") String tenant,
+            @ApiParam(value = "The sink's namespace")
             final @PathParam("namespace") String namespace,
+            @ApiParam(value = "The sink's name")
             final @PathParam("sinkName") String sinkName,
+            @ApiParam(value = "The sink instanceId")
             final @PathParam("instanceId") String instanceId) throws IOException {
         return sink.getSinkInstanceStatus(
             tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
@@ -167,14 +304,18 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             response = SinkStatus.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 404, message = "The sink doesn't exist")
+            @ApiResponse(code = 400, message = "Invalid get status request"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
     })
     @Produces(MediaType.APPLICATION_JSON)
     @Path("/{tenant}/{namespace}/{sinkName}/status")
-    public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
+    public SinkStatus getSinkStatus(@ApiParam(value = "The sink's tenant")
+                                    final @PathParam("tenant") String tenant,
+                                    @ApiParam(value = "The sink's namespace")
                                     final @PathParam("namespace") String namespace,
+                                    @ApiParam(value = "The sink's name")
                                     final @PathParam("sinkName") String sinkName) throws IOException {
         return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
@@ -186,11 +327,15 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             responseContainer = "Collection"
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+            @ApiResponse(code = 400, message = "Invalid list request"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to authorize, etc.)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}")
-    public List<String> listSinks(final @PathParam("tenant") String tenant,
+    public List<String> listSinks(@ApiParam(value = "The sink's tenant")
+                                  final @PathParam("tenant") String tenant,
+                                  @ApiParam(value = "The sink's namespace")
                                   final @PathParam("namespace") String namespace) {
         return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
     }
@@ -198,15 +343,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Restart sink instance", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid restart request"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink instance, failed to authorize, etc.)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSink(final @PathParam("tenant") String tenant,
+    public void restartSink(@ApiParam(value = "The sink's tenant")
+                            final @PathParam("tenant") String tenant,
+                            @ApiParam(value = "The sink's namespace")
                             final @PathParam("namespace") String namespace,
+                            @ApiParam(value = "The sink's name")
                             final @PathParam("sinkName") String sinkName,
+                            @ApiParam(value = "The sink instanceId")
                             final @PathParam("instanceId") String instanceId) {
         sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
@@ -214,14 +365,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Restart all sink instances", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid restart request"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/restart")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void restartSink(final @PathParam("tenant") String tenant,
+    public void restartSink(@ApiParam(value = "The sink's tenant")
+                            final @PathParam("tenant") String tenant,
+                            @ApiParam(value = "The sink's namespace")
                             final @PathParam("namespace") String namespace,
+                            @ApiParam(value = "The sink's name")
                             final @PathParam("sinkName") String sinkName) {
         sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
@@ -229,15 +385,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Stop sink instance", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid stop request"),
+            @ApiResponse(code = 404, message = "The sink instance does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSink(final @PathParam("tenant") String tenant,
+    public void stopSink(@ApiParam(value = "The sink's tenant")
+                         final @PathParam("tenant") String tenant,
+                         @ApiParam(value = "The sink's namespace")
                          final @PathParam("namespace") String namespace,
+                         @ApiParam(value = "The sink's name")
                          final @PathParam("sinkName") String sinkName,
+                         @ApiParam(value = "The sink instanceId")
                          final @PathParam("instanceId") String instanceId) {
         sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
@@ -245,14 +407,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Stop all sink instances", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid stop request"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/stop")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void stopSink(final @PathParam("tenant") String tenant,
+    public void stopSink(@ApiParam(value = "The sink's tenant")
+                         final @PathParam("tenant") String tenant,
+                         @ApiParam(value = "The sink's namespace")
                          final @PathParam("namespace") String namespace,
+                         @ApiParam(value = "The sink's name")
                          final @PathParam("sinkName") String sinkName) {
         sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
@@ -260,15 +427,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Start sink instance", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid start request"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void startSink(final @PathParam("tenant") String tenant,
+    public void startSink(@ApiParam(value = "The sink's tenant")
+                          final @PathParam("tenant") String tenant,
+                          @ApiParam(value = "The sink's namespace")
                           final @PathParam("namespace") String namespace,
+                          @ApiParam(value = "The sink's name")
                           final @PathParam("sinkName") String sinkName,
+                          @ApiParam(value = "The sink instanceId")
                           final @PathParam("instanceId") String instanceId) {
         sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
     }
@@ -276,14 +449,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
     @POST
     @ApiOperation(value = "Start all sink instances", response = Void.class)
     @ApiResponses(value = {
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 404, message = "The function does not exist"),
-            @ApiResponse(code = 500, message = "Internal server error")
+            @ApiResponse(code = 400, message = "Invalid start request"),
+            @ApiResponse(code = 404, message = "The sink does not exist"),
+            @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"),
+            @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+            @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
     })
     @Path("/{tenant}/{namespace}/{sinkName}/start")
     @Consumes(MediaType.APPLICATION_JSON)
-    public void startSink(final @PathParam("tenant") String tenant,
+    public void startSink(@ApiParam(value = "The sink's tenant")
+                          final @PathParam("tenant") String tenant,
+                          @ApiParam(value = "The sink's namespace")
                           final @PathParam("namespace") String namespace,
+                          @ApiParam(value = "The sink's name")
                           final @PathParam("sinkName") String sinkName) {
         sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
     }
@@ -294,9 +472,7 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
             response = List.class
     )
     @ApiResponses(value = {
-            @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
-            @ApiResponse(code = 400, message = "Invalid request"),
-            @ApiResponse(code = 408, message = "Request timeout")
+            @ApiResponse(code = 200, message = "Get builtin sinks successfully.")
     })
     @Path("/builtinsinks")
     public List<ConnectorDefinition> getSinkList() {
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 9b40e00..74ff8aa 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -104,6 +104,11 @@
     </dependency>
 
     <dependency>
+      <groupId>io.swagger</groupId>
+      <artifactId>swagger-annotations</artifactId>
+    </dependency>
+
+    <dependency>
       <groupId>org.apache.bookkeeper</groupId>
       <artifactId>circe-checksum</artifactId>
       <version>${bookkeeper.version}</version>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
index b5d956d..d1186ca 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
@@ -18,12 +18,18 @@
  */
 package org.apache.pulsar.common.functions;
 
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
 import lombok.Builder;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
+@ApiModel(value = "UpdateOptions", description = "Options while updating the sink")
 public class UpdateOptions {
+    @ApiModelProperty(
+        value = "Whether or not to update the auth data",
+        name = "update-auth-data")
     private boolean updateAuthData = false;
 }