You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/17 19:35:10 UTC
[pulsar] branch master updated: [REST API Doc]Sink API refinement
(#4520)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d2546f0 [REST API Doc]Sink API refinement (#4520)
d2546f0 is described below
commit d2546f0c38e378ddf668a27330d7f9c5412a04b7
Author: Yijie Shen <he...@gmail.com>
AuthorDate: Tue Jun 18 03:35:02 2019 +0800
[REST API Doc]Sink API refinement (#4520)
* revisit documentation for pulsar admin api
* format
* update json body
* sinkConfig as doc base
---
.../apache/pulsar/broker/admin/impl/SinksBase.java | 294 ++++++++++++++++-----
pulsar-common/pom.xml | 5 +
.../pulsar/common/functions/UpdateOptions.java | 6 +
3 files changed, 246 insertions(+), 59 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index 50839d8..c527bd3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -18,9 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import io.swagger.annotations.ApiOperation;
-import io.swagger.annotations.ApiResponse;
-import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.*;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.common.functions.UpdateOptions;
@@ -63,19 +61,81 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
- @ApiResponse(code = 408, message = "Request timeout"),
- @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+ @ApiResponse(code = 400, message = "Invalid request (sink already exists, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Sink successfully created"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to get tenant data, failed to process package, etc.)"),
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void registerSink(final @PathParam("tenant") String tenant,
+ public void registerSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(
+ value =
+ "A JSON value presenting a sink config playload. All available configuration options are: \n" +
+ "classname \n" +
+ " The sink's class name if archive is file-url-path (file://) \n" +
+ "sourceSubscriptionName \n" +
+ " Pulsar source subscription name if user wants a specific \n" +
+ " subscription-name for input-topic consumer \n" +
+ "inputs \n" +
+ " The sink's input topic or topics (specified as a JSON array) \n" +
+ "topicsPattern \n" +
+ " TopicsPattern to consume from list of topics under a namespace that " +
+ " match the pattern. [input] and [topicsPattern] are mutually " +
+ " exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
+ " (supported for java fun only)" +
+ "topicToSerdeClassName \n" +
+ " The map of input topics to SerDe class names (specified as a JSON object) \n" +
+ "topicToSchemaType \n" +
+ " The map of input topics to Schema types or class names (specified as a JSON object) \n" +
+ "inputSpecs \n" +
+ " The map of input topics to its consumer configuration, each configuration has schema of " +
+ " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
+ "configs \n" +
+ " The map of configs (specified as a JSON object) \n" +
+ "secrets \n" +
+ " a map of secretName(aka how the secret is going to be \n" +
+ " accessed in the function via context) to an object that \n" +
+ " encapsulates how the secret is fetched by the underlying \n" +
+ " secrets provider. The type of an value here can be found by the \n" +
+ " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" +
+ "parallelism \n" +
+ " The sink's parallelism factor (i.e. the number of sink instances to run \n" +
+ "processingGuarantees \n" +
+ " The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" +
+ "retainOrdering \n" +
+ " Boolean denotes whether sink consumes and sinks messages in order \n" +
+ "resources \n" +
+ " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime) \n" +
+ "autoAck \n" +
+ " Boolean denotes whether or not the framework will automatically acknowledge messages \n" +
+ "timeoutMs \n" +
+ " Long denotes the message timeout in milliseconds \n" +
+ "cleanupSubscription \n" +
+ " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" +
+ "runtimeFlags \n" +
+ " Any flags that you want to pass to the runtime as a single string \n",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{ \n" +
+ "\t\"classname\": \"org.example.MySinkTest\",\n" +
+ "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
+ "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
+ "\t\"parallelism\": 10\n" +
+ "}"
+ )
+ )
+ )
final @FormDataParam("sinkConfig") String sinkConfigJson) {
sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
@@ -85,19 +145,84 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@PUT
@ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
- @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ @ApiResponse(code = 400, message = "Invalid request (sink doesn't exist, update contains no change, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Sink successfully updated"),
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to process package, etc.)"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void updateSink(final @PathParam("tenant") String tenant,
+ public void updateSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(
+ value =
+ "A JSON value presenting a sink config playload. All available configuration options are: \n" +
+ "classname \n" +
+ " The sink's class name if archive is file-url-path (file://) \n" +
+ "sourceSubscriptionName \n" +
+ " Pulsar source subscription name if user wants a specific \n" +
+ " subscription-name for input-topic consumer \n" +
+ "inputs \n" +
+ " The sink's input topic or topics (specified as a JSON array) \n" +
+ "topicsPattern \n" +
+ " TopicsPattern to consume from list of topics under a namespace that " +
+ " match the pattern. [input] and [topicsPattern] are mutually " +
+ " exclusive. Add SerDe class name for a pattern in customSerdeInputs " +
+ " (supported for java fun only)" +
+ "topicToSerdeClassName \n" +
+ " The map of input topics to SerDe class names (specified as a JSON object) \n" +
+ "topicToSchemaType \n" +
+ " The map of input topics to Schema types or class names (specified as a JSON object) \n" +
+ "inputSpecs \n" +
+ " The map of input topics to its consumer configuration, each configuration has schema of " +
+ " {\"schemaType\": \"type-x\", \"serdeClassName\": \"name-x\", \"isRegexPattern\": true, \"receiverQueueSize\": 5} \n" +
+ "configs \n" +
+ " The map of configs (specified as a JSON object) \n" +
+ "secrets \n" +
+ " a map of secretName(aka how the secret is going to be \n" +
+ " accessed in the function via context) to an object that \n" +
+ " encapsulates how the secret is fetched by the underlying \n" +
+ " secrets provider. The type of an value here can be found by the \n" +
+ " SecretProviderConfigurator.getSecretObjectType() method. (specified as a JSON object) \n" +
+ "parallelism \n" +
+ " The sink's parallelism factor (i.e. the number of sink instances to run \n" +
+ "processingGuarantees \n" +
+ " The processing guarantees (aka delivery semantics) applied to the sink. Possible Values: \"ATLEAST_ONCE\", \"ATMOST_ONCE\", \"EFFECTIVELY_ONCE\" \n" +
+ "retainOrdering \n" +
+ " Boolean denotes whether sink consumes and sinks messages in order \n" +
+ "resources \n" +
+ " {\"cpu\": 1, \"ram\": 2, \"disk\": 3} The CPU (in cores), RAM (in bytes) and disk (in bytes) that needs to be allocated per sink instance (applicable only to Docker runtime) \n" +
+ "autoAck \n" +
+ " Boolean denotes whether or not the framework will automatically acknowledge messages \n" +
+ "timeoutMs \n" +
+ " Long denotes the message timeout in milliseconds \n" +
+ "cleanupSubscription \n" +
+ " Boolean denotes whether the subscriptions the functions created/used should be deleted when the functions is deleted \n" +
+ "runtimeFlags \n" +
+ " Any flags that you want to pass to the runtime as a single string \n",
+ examples = @Example(
+ value = @ExampleProperty(
+ mediaType = MediaType.APPLICATION_JSON,
+ value = "{ \n" +
+ "\t\"classname\": \"org.example.SinkStressTest\", \n" +
+ "\t\"inputs\": [\"persistent://public/default/sink-input\"],\n" +
+ "\t\"processingGuarantees\": \"EFFECTIVELY_ONCE\",\n" +
+ "\t\"parallelism\": 5\n" +
+ "}"
+ )
+ )
+ )
final @FormDataParam("sinkConfig") String sinkConfigJson,
+ @ApiParam()
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
@@ -109,15 +234,20 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@DELETE
@ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode")
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function doesn't exist"),
- @ApiResponse(code = 408, message = "Request timeout"),
- @ApiResponse(code = 200, message = "The function was successfully deleted")
+ @ApiResponse(code = 400, message = "Invalid deregister request"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 200, message = "The sink was successfully deleted"),
+ @ApiResponse(code = 401, message = "Client is not authorized to perform operation"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to authorize, failed to deregister, etc.)"),
+ @ApiResponse(code = 408, message = "Got InterruptedException while deregistering the sink"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
- public void deregisterSink(final @PathParam("tenant") String tenant,
+ public void deregisterSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) {
sink.deregisterFunction(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@@ -128,14 +258,16 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
response = SinkConfig.class
)
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 408, message = "Request timeout"),
- @ApiResponse(code = 404, message = "The function doesn't exist")
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}")
- public SinkConfig getSinkInfo(final @PathParam("tenant") String tenant,
+ public SinkConfig getSinkInfo(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) throws IOException {
return sink.getSinkInfo(tenant, namespace, sinkName);
}
@@ -146,16 +278,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
response = SinkStatus.SinkInstanceStatus.SinkInstanceStatusData.class
)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "The sink doesn't exist")
+ @ApiResponse(code = 400, message = "The sink instance does not exist"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal Server Error (got exception while getting status, etc.)"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(
+ @ApiParam(value = "The sink's tenant")
final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
+ @ApiParam(value = "The sink instanceId")
final @PathParam("instanceId") String instanceId) throws IOException {
return sink.getSinkInstanceStatus(
tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
@@ -167,14 +304,18 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
response = SinkStatus.class
)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "The sink doesn't exist")
+ @ApiResponse(code = 400, message = "Invalid get status request"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later."),
})
@Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{sinkName}/status")
- public SinkStatus getSinkStatus(final @PathParam("tenant") String tenant,
+ public SinkStatus getSinkStatus(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) throws IOException {
return sink.getSinkStatus(tenant, namespace, sinkName, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@@ -186,11 +327,15 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
responseContainer = "Collection"
)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ @ApiResponse(code = 400, message = "Invalid list request"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to authorize, etc.)"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}")
- public List<String> listSinks(final @PathParam("tenant") String tenant,
+ public List<String> listSinks(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace) {
return sink.listFunctions(tenant, namespace, clientAppId(), clientAuthData());
}
@@ -198,15 +343,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Restart sink instance", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid restart request"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink instance, failed to authorize, etc.)"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
- public void restartSink(final @PathParam("tenant") String tenant,
+ public void restartSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
+ @ApiParam(value = "The sink instanceId")
final @PathParam("instanceId") String instanceId) {
sink.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@@ -214,14 +365,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Restart all sink instances", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid restart request"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to restart the sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/restart")
@Consumes(MediaType.APPLICATION_JSON)
- public void restartSink(final @PathParam("tenant") String tenant,
+ public void restartSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) {
sink.restartFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@@ -229,15 +385,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Stop sink instance", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid stop request"),
+ @ApiResponse(code = 404, message = "The sink instance does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
- public void stopSink(final @PathParam("tenant") String tenant,
+ public void stopSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
+ @ApiParam(value = "The sink instanceId")
final @PathParam("instanceId") String instanceId) {
sink.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@@ -245,14 +407,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Stop all sink instances", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid stop request"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to stop the sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/stop")
@Consumes(MediaType.APPLICATION_JSON)
- public void stopSink(final @PathParam("tenant") String tenant,
+ public void stopSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) {
sink.stopFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@@ -260,15 +427,21 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Start sink instance", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid start request"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/start")
@Consumes(MediaType.APPLICATION_JSON)
- public void startSink(final @PathParam("tenant") String tenant,
+ public void startSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName,
+ @ApiParam(value = "The sink instanceId")
final @PathParam("instanceId") String instanceId) {
sink.startFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri(), clientAppId(), clientAuthData());
}
@@ -276,14 +449,19 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
@POST
@ApiOperation(value = "Start all sink instances", response = Void.class)
@ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
+ @ApiResponse(code = 400, message = "Invalid start request"),
+ @ApiResponse(code = 404, message = "The sink does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error (failed to start the sink, failed to authorize, etc.)"),
+ @ApiResponse(code = 401, message = "The client is not authorized to perform this operation"),
+ @ApiResponse(code = 503, message = "Function worker service is now initializing. Please try again later.")
})
@Path("/{tenant}/{namespace}/{sinkName}/start")
@Consumes(MediaType.APPLICATION_JSON)
- public void startSink(final @PathParam("tenant") String tenant,
+ public void startSink(@ApiParam(value = "The sink's tenant")
+ final @PathParam("tenant") String tenant,
+ @ApiParam(value = "The sink's namespace")
final @PathParam("namespace") String namespace,
+ @ApiParam(value = "The sink's name")
final @PathParam("sinkName") String sinkName) {
sink.startFunctionInstances(tenant, namespace, sinkName, clientAppId(), clientAuthData());
}
@@ -294,9 +472,7 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
response = List.class
)
@ApiResponses(value = {
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 408, message = "Request timeout")
+ @ApiResponse(code = 200, message = "Get builtin sinks successfully.")
})
@Path("/builtinsinks")
public List<ConnectorDefinition> getSinkList() {
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 9b40e00..74ff8aa 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -104,6 +104,11 @@
</dependency>
<dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ </dependency>
+
+ <dependency>
<groupId>org.apache.bookkeeper</groupId>
<artifactId>circe-checksum</artifactId>
<version>${bookkeeper.version}</version>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
index b5d956d..d1186ca 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/functions/UpdateOptions.java
@@ -18,12 +18,18 @@
*/
package org.apache.pulsar.common.functions;
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
@Data
@NoArgsConstructor
+@ApiModel(value = "UpdateOptions", description = "Options while updating the sink")
public class UpdateOptions {
+ @ApiModelProperty(
+ value = "Whether or not to update the auth data",
+ name = "update-auth-data")
private boolean updateAuthData = false;
}