You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2018/12/27 01:29:36 UTC
[pulsar] branch master updated: add backwards compatiblity to 2.2
for function admin API (#3241)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e63b658 add backwards compatiblity to 2.2 for function admin API (#3241)
e63b658 is described below
commit e63b658d8d0e5863d9b77658d3b1eb8b374f0de9
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Dec 26 17:29:31 2018 -0800
add backwards compatiblity to 2.2 for function admin API (#3241)
* add backwards compatiblity to 2.2 for function admin API
* add license headers
* renaming
* fix tests
* get integrations tests to pass
---
.../org/apache/pulsar/broker/PulsarService.java | 1 +
.../apache/pulsar/broker/admin/v1/Functions.java | 9 +-
.../apache/pulsar/broker/admin/v2/Functions.java | 303 ++++++++++-
.../pulsar/broker/admin/{v2 => v3}/Functions.java | 6 +-
.../pulsar/broker/admin/{v2 => v3}/Sink.java | 2 +-
.../pulsar/broker/admin/{v2 => v3}/Source.java | 2 +-
.../client/admin/internal/FunctionsImpl.java | 2 +-
.../pulsar/client/admin/internal/SinkImpl.java | 2 +-
.../pulsar/client/admin/internal/SourceImpl.java | 2 +-
.../src/main/proto/InstanceCommunication.proto | 8 +-
.../pulsar/functions/worker/rest/Resources.java | 19 +-
.../pulsar/functions/worker/rest/WorkerServer.java | 10 +-
.../functions/worker/rest/api/FunctionsImplV2.java | 210 +++++++
.../worker/rest/api/v2/FunctionApiV2Resource.java | 299 +++++-----
.../FunctionApiV3Resource.java} | 6 +-
.../SinkApiV3Resource.java} | 6 +-
.../SourceApiV3Resource.java} | 6 +-
.../rest/api/v2/FunctionApiV2ResourceTest.java | 603 ++++++++++-----------
.../FunctionApiV3ResourceTest.java} | 9 +-
.../SinkApiV3ResourceTest.java} | 6 +-
.../SourceApiV3ResourceTest.java} | 6 +-
21 files changed, 1028 insertions(+), 489 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index f576f95..417ec21 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -381,6 +381,7 @@ public class PulsarService implements AutoCloseable {
this.webService.addRestResources("/", "org.apache.pulsar.broker.web", false, attributeMap);
this.webService.addRestResources("/admin", "org.apache.pulsar.broker.admin.v1", true, attributeMap);
this.webService.addRestResources("/admin/v2", "org.apache.pulsar.broker.admin.v2", true, attributeMap);
+ this.webService.addRestResources("/admin/v3", "org.apache.pulsar.broker.admin.v3", true, attributeMap);
this.webService.addRestResources("/lookup", "org.apache.pulsar.broker.lookup", true, attributeMap);
this.webService.addServlet("/metrics",
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
index 7eca553..0591279 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Functions.java
@@ -19,10 +19,17 @@
package org.apache.pulsar.broker.admin.v1;
import io.swagger.annotations.Api;
+
+import javax.ws.rs.Consumes;
import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
import org.apache.pulsar.broker.admin.impl.FunctionsBase;
@Path("/functions")
@Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
-public class Functions extends FunctionsBase {
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Functions extends org.apache.pulsar.broker.admin.v2.Functions{
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
index 854a365..19c3052 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
@@ -19,16 +19,311 @@
package org.apache.pulsar.broker.admin.v2;
import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
+import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-
-import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.function.Supplier;
@Path("/functions")
@Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
-public class Functions extends FunctionsBase {
-}
+public class Functions extends AdminResource implements Supplier<WorkerService> {
+
+ private final FunctionsImplV2 functions;
+
+ public Functions() {
+ this.functions = new FunctionsImplV2(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
+ }
+
+ @POST
+ @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("functionDetails") String functionDetailsJson,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
+
+ return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ }
+
+ @PUT
+ @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("functionDetails") String functionDetailsJson,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
+
+ return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ }
+
+
+ @DELETE
+ @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function doesn't exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "The function was successfully deleted")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}")
+ public Response deregisterFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) {
+ return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about a Pulsar Function currently running in cluster mode",
+ response = FunctionMetaData.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}")
+ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) throws IOException {
+
+ return functions.getFunctionInfo(
+ tenant, namespace, functionName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Function instance",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
+ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Function running in cluster mode",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/status")
+ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) throws IOException {
+ return functions.getFunctionStatusV2(
+ tenant, namespace, functionName, uri.getRequestUri());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Lists all Pulsar Functions currently deployed in a given namespace",
+ response = String.class,
+ responseContainer = "Collection"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}")
+ public Response listFunctions(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions( tenant, namespace);
+ }
+
+ @POST
+ @ApiOperation(
+ value = "Triggers a Pulsar Function with a user-specified value or file data",
+ response = Message.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/trigger")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response triggerFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") String triggerValue,
+ final @FormDataParam("dataStream") InputStream triggerStream,
+ final @FormDataParam("topic") String topic) {
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetch the current state associated with a Pulsar Function",
+ response = String.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The key does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error")
+ })
+ @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+ public Response getFunctionState(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("key") String key) {
+ return functions.getFunctionState(tenant, namespace, functionName, key);
+ }
+
+ @POST
+ @ApiOperation(value = "Restart function instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all function instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+ return functions.restartFunctionInstances(tenant, namespace, functionName);
+ }
+
+ @POST
+ @ApiOperation(value = "Stop function instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Stop all function instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{functionName}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+ return functions.stopFunctionInstances(tenant, namespace, functionName);
+ }
+
+ @POST
+ @ApiOperation(
+ value = "Uploads Pulsar Function file data",
+ hidden = true
+ )
+ @Path("/upload")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("path") String path) {
+ return functions.uploadFunction(uploadedInputStream, path);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Downloads Pulsar Function file data",
+ hidden = true
+ )
+ @Path("/download")
+ public Response downloadFunction(final @QueryParam("path") String path) {
+ return functions.downloadFunction(path);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+ response = List.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/connectors")
+ public List<ConnectorDefinition> getConnectorsList() throws IOException {
+ return functions.getListOfConnectors();
+ }
+}
\ No newline at end of file
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
similarity index 96%
copy from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
copy to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
index 854a365..ce634a6 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Functions.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Functions.java
@@ -16,16 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.FunctionsBase;
+
import javax.ws.rs.Consumes;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
-import org.apache.pulsar.broker.admin.impl.FunctionsBase;
-
@Path("/functions")
@Api(value = "/functions", description = "Functions admin apis", tags = "functions", hidden = true)
@Produces(MediaType.APPLICATION_JSON)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
index aea0ae7..e137f08 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Sink.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
import io.swagger.annotations.Api;
import org.apache.pulsar.broker.admin.impl.SinkBase;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
similarity index 96%
rename from pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
rename to pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
index e5ef56c..24e84eb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v3/Source.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.broker.admin.v2;
+package org.apache.pulsar.broker.admin.v3;
import io.swagger.annotations.Api;
import org.apache.pulsar.broker.admin.impl.SourceBase;
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 1493d1d..130704e 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -56,7 +56,7 @@ public class FunctionsImpl extends BaseResource implements Functions {
public FunctionsImpl(WebTarget web, Authentication auth) {
super(auth);
- this.functions = web.path("/admin/functions");
+ this.functions = web.path("/admin/v3/functions");
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
index 1363d35..a9f99b8 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SinkImpl.java
@@ -46,7 +46,7 @@ public class SinkImpl extends BaseResource implements Sink {
public SinkImpl(WebTarget web, Authentication auth) {
super(auth);
- this.sink = web.path("/admin/v2/sink");
+ this.sink = web.path("/admin/v3/sink");
}
@Override
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
index d0d36a5..2d066e0 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/SourceImpl.java
@@ -46,7 +46,7 @@ public class SourceImpl extends BaseResource implements Source {
public SourceImpl(WebTarget web, Authentication auth) {
super(auth);
- this.source = web.path("/admin/v2/source");
+ this.source = web.path("/admin/v3/source");
}
@Override
diff --git a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
index 1b50ea8..aec1dd3 100644
--- a/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
+++ b/pulsar-functions/proto/src/main/proto/InstanceCommunication.proto
@@ -60,10 +60,10 @@ message FunctionStatus {
}
// Deprecated
-//message FunctionStatusList {
-// string error = 2;
-// repeated FunctionStatus functionStatusList = 1;
-//}
+message FunctionStatusList {
+ string error = 2;
+ repeated FunctionStatus functionStatusList = 1;
+}
message MetricsData {
// message DataDigest {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index ac011db..530f528 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,8 +20,9 @@ package org.apache.pulsar.functions.worker.rest;
import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
-import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.FunctionApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SinkApiV3Resource;
+import org.apache.pulsar.functions.worker.rest.api.v3.SourceApiV3Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
@@ -34,17 +35,25 @@ public final class Resources {
private Resources() {
}
- public static Set<Class<?>> getApiResources() {
+ public static Set<Class<?>> getApiV2Resources() {
return new HashSet<>(
Arrays.asList(
FunctionApiV2Resource.class,
- SourceApiV2Resource.class,
- SinkApiV2Resource.class,
WorkerApiV2Resource.class,
MultiPartFeature.class
));
}
+ public static Set<Class<?>> getApiV3Resources() {
+ return new HashSet<>(
+ Arrays.asList(
+ MultiPartFeature.class,
+ SourceApiV3Resource.class,
+ SinkApiV3Resource.class,
+ FunctionApiV3Resource.class
+ ));
+ }
+
public static Set<Class<?>> getRootResources() {
return new HashSet<>(
Arrays.asList(
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
index 5bc2f4c..35a4b7a 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/WorkerServer.java
@@ -76,7 +76,7 @@ public class WorkerServer {
public WorkerServer(WorkerService workerService) {
this.workerConfig = workerService.getWorkerConfig();
this.workerService = workerService;
- this.webServerExecutor = new WebExecutorThreadPool("function-web");
+ this.webServerExecutor = new WebExecutorThreadPool(8, "function-web");
init();
}
@@ -93,11 +93,13 @@ public class WorkerServer {
connector.setPort(this.workerConfig.getWorkerPort());
connectors.add(connector);
- List<Handler> handlers = new ArrayList<>(3);
+ List<Handler> handlers = new ArrayList<>(4);
handlers.add(
- newServletContextHandler("/admin", new ResourceConfig(Resources.getApiResources()), workerService));
+ newServletContextHandler("/admin", new ResourceConfig(Resources.getApiV2Resources()), workerService));
handlers.add(
- newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiResources()), workerService));
+ newServletContextHandler("/admin/v2", new ResourceConfig(Resources.getApiV2Resources()), workerService));
+ handlers.add(
+ newServletContextHandler("/admin/v3", new ResourceConfig(Resources.getApiV3Resources()), workerService));
handlers.add(newServletContextHandler("/", new ResourceConfig(Resources.getRootResources()), workerService));
RequestLogHandler requestLogHandler = new RequestLogHandler();
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
new file mode 100644
index 0000000..c71e1c2
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class FunctionsImplV2 {
+
+ private FunctionsImpl delegate;
+ public FunctionsImplV2(Supplier<WorkerService> workerServiceSupplier) {
+ this.delegate = new FunctionsImpl(workerServiceSupplier);
+ }
+
+ // For test purposes
+ public FunctionsImplV2(FunctionsImpl delegate) {
+ this.delegate = delegate;
+ }
+
+ public Response getFunctionInfo(final String tenant, final String namespace, final String functionName)
+ throws IOException {
+
+ // run just for parameter checks
+ delegate.getFunctionInfo(tenant, namespace, functionName);
+
+ FunctionMetaDataManager functionMetaDataManager = delegate.worker().getFunctionMetaDataManager();
+
+ Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(tenant, namespace,
+ functionName);
+ String functionDetailsJson = org.apache.pulsar.functions.utils.Utils.printJson(functionMetaData.getFunctionDetails());
+ return Response.status(Response.Status.OK).entity(functionDetailsJson).build();
+ }
+
+ public Response getFunctionInstanceStatus(final String tenant, final String namespace, final String functionName,
+ final String instanceId, URI uri) throws IOException {
+
+ org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+ functionInstanceStatus = delegate.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri);
+
+ String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(toProto(functionInstanceStatus, instanceId));
+ return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ }
+
+ public Response getFunctionStatusV2(String tenant, String namespace, String functionName, URI requestUri) throws
+ IOException {
+ FunctionStatus functionStatus = delegate.getFunctionStatus(tenant, namespace, functionName, requestUri);
+ InstanceCommunication.FunctionStatusList.Builder functionStatusList = InstanceCommunication.FunctionStatusList.newBuilder();
+ functionStatus.instances.forEach(functionInstanceStatus -> functionStatusList.addFunctionStatusList(
+ toProto(functionInstanceStatus.getStatus(),
+ String.valueOf(functionInstanceStatus.getInstanceId()))));
+ String jsonResponse = org.apache.pulsar.functions.utils.Utils.printJson(functionStatusList);
+ return Response.status(Response.Status.OK).entity(jsonResponse).build();
+ }
+
+ public Response registerFunction(String tenant, String namespace, String functionName, InputStream
+ uploadedInputStream, FormDataContentDisposition fileDetail, String functionPkgUrl, String
+ functionDetailsJson, String functionConfigJson, String clientAppId) {
+ delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+ return Response.ok().build();
+ }
+
+ public Response updateFunction(String tenant, String namespace, String functionName, InputStream uploadedInputStream,
+ FormDataContentDisposition fileDetail, String functionPkgUrl, String
+ functionDetailsJson, String functionConfigJson, String clientAppId) {
+ delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId);
+ return Response.ok().build();
+ }
+
+ public Response deregisterFunction(String tenant, String namespace, String functionName, String clientAppId) {
+ delegate.deregisterFunction(tenant, namespace, functionName, clientAppId);
+ return Response.ok().build();
+ }
+
+ public Response listFunctions(String tenant, String namespace) {
+ Collection<String> functionStateList = delegate.listFunctions( tenant, namespace);
+ return Response.status(Response.Status.OK).entity(new Gson().toJson(functionStateList.toArray())).build();
+ }
+
+ public Response triggerFunction(String tenant, String namespace, String functionName, String triggerValue,
+ InputStream triggerStream, String topic) {
+ String result = delegate.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
+ return Response.status(Response.Status.OK).entity(result).build();
+ }
+
+ public Response getFunctionState(String tenant, String namespace, String functionName, String key) {
+ FunctionState functionState = delegate.getFunctionState(
+ tenant, namespace, functionName, key);
+
+ String value;
+ if (functionState.getNumberValue() != null) {
+ value = "value : " + functionState.getNumberValue() + ", version : " + functionState.getVersion();
+ } else {
+ value = "value : " + functionState.getStringValue() + ", version : " + functionState.getVersion();
+ }
+ return Response.status(Response.Status.OK)
+ .entity(value)
+ .build();
+ }
+
+ public Response restartFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+ uri) {
+ delegate.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+ return Response.ok().build();
+ }
+
+ public Response restartFunctionInstances(String tenant, String namespace, String functionName) {
+ delegate.restartFunctionInstances(tenant, namespace, functionName);
+ return Response.ok().build();
+ }
+
+ public Response stopFunctionInstance(String tenant, String namespace, String functionName, String instanceId, URI
+ uri) {
+ delegate.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri);
+ return Response.ok().build();
+ }
+
+ public Response stopFunctionInstances(String tenant, String namespace, String functionName) {
+ delegate.stopFunctionInstances(tenant, namespace, functionName);
+ return Response.ok().build();
+ }
+
+ public Response uploadFunction(InputStream uploadedInputStream, String path) {
+ delegate.uploadFunction(uploadedInputStream, path);
+ return Response.ok().build();
+ }
+
+ public Response downloadFunction(String path) {
+ return Response.status(Response.Status.OK).entity(delegate.downloadFunction(path)).build();
+ }
+
+ public List<ConnectorDefinition> getListOfConnectors() {
+ return delegate.getListOfConnectors();
+ }
+
+ private InstanceCommunication.FunctionStatus toProto(
+ org.apache.pulsar.common.policies.data.FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData
+ functionInstanceStatus, String instanceId) {
+ List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestSysExceptions
+ = functionInstanceStatus.getLatestSystemExceptions()
+ .stream()
+ .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+ .setExceptionString(exceptionInformation.getExceptionString())
+ .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+ .build())
+ .collect(Collectors.toList());
+
+ List<InstanceCommunication.FunctionStatus.ExceptionInformation> latestUserExceptions
+ = functionInstanceStatus.getLatestUserExceptions()
+ .stream()
+ .map(exceptionInformation -> InstanceCommunication.FunctionStatus.ExceptionInformation.newBuilder()
+ .setExceptionString(exceptionInformation.getExceptionString())
+ .setMsSinceEpoch(exceptionInformation.getTimestampMs())
+ .build())
+ .collect(Collectors.toList());
+
+
+ InstanceCommunication.FunctionStatus functionStatus = InstanceCommunication.FunctionStatus.newBuilder()
+ .setRunning(functionInstanceStatus.isRunning())
+ .setFailureException(functionInstanceStatus.getError())
+ .setNumRestarts(functionInstanceStatus.getNumRestarts())
+ .setNumSuccessfullyProcessed(functionInstanceStatus.getNumSuccessfullyProcessed())
+ .setNumUserExceptions(functionInstanceStatus.getNumUserExceptions())
+ .addAllLatestUserExceptions(latestUserExceptions)
+ .setNumSystemExceptions(functionInstanceStatus.getNumSystemExceptions())
+ .addAllLatestSystemExceptions(latestSysExceptions)
+ .setAverageLatency(functionInstanceStatus.getAverageLatency())
+ .setLastInvocationTime(functionInstanceStatus.getLastInvocationTime())
+ .setInstanceId(instanceId)
+ .setWorkerId(delegate.worker().getWorkerConfig().getWorkerId())
+ .build();
+
+ return functionStatus;
+ }
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index b620fe5..2894c05 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -22,13 +22,12 @@ import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import lombok.extern.slf4j.Slf4j;
-import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.common.functions.FunctionState;
+import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.policies.data.FunctionStats;
-import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
-import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
@@ -39,10 +38,9 @@ import javax.ws.rs.POST;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
-import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.MediaType;
-import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -51,242 +49,271 @@ import java.util.List;
@Path("/functions")
public class FunctionApiV2Resource extends FunctionApiResource {
- protected final FunctionsImpl functions;
+ protected final FunctionsImplV2 functions;
public FunctionApiV2Resource() {
- this.functions = new FunctionsImpl(this);
+ this.functions = new FunctionsImplV2(this);
}
@POST
+ @ApiOperation(value = "Creates a new Pulsar Function in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+ })
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void registerFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @FormDataParam("data") InputStream uploadedInputStream,
- final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("functionDetails") String functionDetailsJson,
- final @FormDataParam("functionConfig") String functionConfigJson) {
+ public Response registerFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("functionDetails") String functionDetailsJson,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
- functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
}
@PUT
+ @ApiOperation(value = "Updates a Pulsar Function currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
@Path("/{tenant}/{namespace}/{functionName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void updateFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @FormDataParam("data") InputStream uploadedInputStream,
- final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("functionDetails") String functionDetailsJson,
- final @FormDataParam("functionConfig") String functionConfigJson) {
+ public Response updateFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("functionDetails") String functionDetailsJson,
+ final @FormDataParam("functionConfig") String functionConfigJson) {
- functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
+ return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
-
}
+
@DELETE
+ @ApiOperation(value = "Deletes a Pulsar Function currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function doesn't exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "The function was successfully deleted")
+ })
@Path("/{tenant}/{namespace}/{functionName}")
- public void deregisterFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) {
- functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
+ public Response deregisterFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) {
+ return functions.deregisterFunction(tenant, namespace, functionName, clientAppId());
}
@GET
+ @ApiOperation(
+ value = "Fetches information about a Pulsar Function currently running in cluster mode",
+ response = Function.FunctionMetaData.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
@Path("/{tenant}/{namespace}/{functionName}")
- public FunctionConfig getFunctionInfo(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) {
- return functions.getFunctionInfo(tenant, namespace, functionName);
+ public Response getFunctionInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) throws IOException {
+
+ return functions.getFunctionInfo(
+ tenant, namespace, functionName);
}
@GET
@ApiOperation(
value = "Displays the status of a Pulsar Function instance",
- response = FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData.class
+ response = InstanceCommunication.FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
@ApiResponse(code = 404, message = "The function doesn't exist")
})
- @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/status")
- public FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData getFunctionInstanceStatus(
- final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionInstanceStatus(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
+ public Response getFunctionInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+
+ return functions.getFunctionInstanceStatus(tenant, namespace, functionName, instanceId, uri.getRequestUri());
}
@GET
@ApiOperation(
- value = "Displays the status of a Pulsar Function",
- response = FunctionStatus.class
+ value = "Displays the status of a Pulsar Function running in cluster mode",
+ response = InstanceCommunication.FunctionStatus.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "The function doesn't exist")
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
- @Produces(MediaType.APPLICATION_JSON)
@Path("/{tenant}/{namespace}/{functionName}/status")
- public FunctionStatus getFunctionStatus(
- final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStatus(
+ public Response getFunctionStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName) throws IOException {
+ return functions.getFunctionStatusV2(
tenant, namespace, functionName, uri.getRequestUri());
}
@GET
@ApiOperation(
- value = "Displays the stats of a Pulsar Function",
- response = FunctionStats.class
+ value = "Lists all Pulsar Functions currently deployed in a given namespace",
+ response = String.class,
+ responseContainer = "Collection"
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "The function doesn't exist")
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
})
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/{tenant}/{namespace}/{functionName}/stats")
- public FunctionStats getFunctionStats(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) throws IOException {
- return functions.getFunctionStats(tenant, namespace, functionName, uri.getRequestUri());
+ @Path("/{tenant}/{namespace}")
+ public Response listFunctions(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions( tenant, namespace);
}
- @GET
+ @POST
@ApiOperation(
- value = "Displays the stats of a Pulsar Function instance",
- response = FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData.class
+ value = "Triggers a Pulsar Function with a user-specified value or file data",
+ response = Message.class
)
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
- @ApiResponse(code = 404, message = "The function doesn't exist")
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 500, message = "Internal server error")
})
- @Produces(MediaType.APPLICATION_JSON)
- @Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stats")
- public FunctionStats.FunctionInstanceStats.FunctionInstanceStatsData getFunctionInstanceStats(
- final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @PathParam("instanceId") String instanceId) throws IOException {
- return functions.getFunctionsInstanceStats(
- tenant, namespace, functionName, instanceId, uri.getRequestUri());
- }
-
- @POST
@Path("/{tenant}/{namespace}/{functionName}/trigger")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public String triggerFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @FormDataParam("data") String input,
- final @FormDataParam("dataStream") InputStream uploadedInputStream,
- final @FormDataParam("topic") String topic) {
- return functions.triggerFunction(tenant, namespace, functionName, input, uploadedInputStream, topic);
+ public Response triggerFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @FormDataParam("data") String triggerValue,
+ final @FormDataParam("dataStream") InputStream triggerStream,
+ final @FormDataParam("topic") String topic) {
+ return functions.triggerFunction(tenant, namespace, functionName, triggerValue, triggerStream, topic);
}
- @POST
- @ApiOperation(value = "Restart function instance", response = Void.class)
+ @GET
+ @ApiOperation(
+ value = "Fetch the current state associated with a Pulsar Function",
+ response = String.class
+ )
@ApiResponses(value = {
@ApiResponse(code = 400, message = "Invalid request"),
- @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The key does not exist"),
@ApiResponse(code = 500, message = "Internal server error")
})
+ @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
+ public Response getFunctionState(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("functionName") String functionName,
+ final @PathParam("key") String key) {
+ return functions.getFunctionState(tenant, namespace, functionName, key);
+ }
+
+ @POST
+ @ApiOperation(value = "Restart function instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/restart")
@Consumes(MediaType.APPLICATION_JSON)
- public void restartFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @PathParam("instanceId") String instanceId) {
- functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
}
@POST
@ApiOperation(value = "Restart all function instances", response = Void.class)
- @ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
+ @ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/restart")
@Consumes(MediaType.APPLICATION_JSON)
- public void restartFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) {
- functions.restartFunctionInstances(tenant, namespace, functionName);
+ public Response restartFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+ return functions.restartFunctionInstances(tenant, namespace, functionName);
}
@POST
@ApiOperation(value = "Stop function instance", response = Void.class)
- @ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
+ @ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/{instanceId}/stop")
@Consumes(MediaType.APPLICATION_JSON)
- public void stopFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @PathParam("instanceId") String instanceId) {
- functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, this.uri.getRequestUri());
+ public Response stopFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, functionName, instanceId, uri.getRequestUri());
}
@POST
@ApiOperation(value = "Stop all function instances", response = Void.class)
- @ApiResponses(value = {
- @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
@ApiResponse(code = 404, message = "The function does not exist"),
- @ApiResponse(code = 500, message = "Internal server error")
- })
+ @ApiResponse(code = 500, message = "Internal server error") })
@Path("/{tenant}/{namespace}/{functionName}/stop")
@Consumes(MediaType.APPLICATION_JSON)
- public void stopFunction(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName) {
- functions.stopFunctionInstances(tenant, namespace, functionName);
+ public Response stopFunction(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("functionName") String functionName) {
+ return functions.stopFunctionInstances(tenant, namespace, functionName);
}
@POST
+ @ApiOperation(
+ value = "Uploads Pulsar Function file data",
+ hidden = true
+ )
@Path("/upload")
@Consumes(MediaType.MULTIPART_FORM_DATA)
- public void uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
- final @FormDataParam("path") String path) {
- functions.uploadFunction(uploadedInputStream, path);
+ public Response uploadFunction(final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("path") String path) {
+ return functions.uploadFunction(uploadedInputStream, path);
}
@GET
+ @ApiOperation(
+ value = "Downloads Pulsar Function file data",
+ hidden = true
+ )
@Path("/download")
- public StreamingOutput downloadFunction(final @QueryParam("path") String path) {
+ public Response downloadFunction(final @QueryParam("path") String path) {
return functions.downloadFunction(path);
}
@GET
+ @ApiOperation(
+ value = "Fetches a list of supported Pulsar IO connectors currently running in cluster mode",
+ response = List.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
@Path("/connectors")
public List<ConnectorDefinition> getConnectorsList() throws IOException {
return functions.getListOfConnectors();
}
-
- @GET
- @Path("/{tenant}/{namespace}/{functionName}/state/{key}")
- public FunctionState getFunctionState(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("functionName") String functionName,
- final @PathParam("key") String key) throws IOException {
- return functions.getFunctionState(tenant, namespace, functionName, key);
- }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
similarity index 98%
copy from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
copy to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
index b620fe5..d25f8f9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3Resource.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -49,11 +49,11 @@ import java.util.List;
@Slf4j
@Path("/functions")
-public class FunctionApiV2Resource extends FunctionApiResource {
+public class FunctionApiV3Resource extends FunctionApiResource {
protected final FunctionsImpl functions;
- public FunctionApiV2Resource() {
+ public FunctionApiV3Resource() {
this.functions = new FunctionsImpl(this);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
index e630db4..6a8f25e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3Resource.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -42,11 +42,11 @@ import java.util.List;
@Slf4j
@Path("/sink")
-public class SinkApiV2Resource extends FunctionApiResource {
+public class SinkApiV3Resource extends FunctionApiResource {
protected final SinkImpl sink;
- public SinkApiV2Resource() {
+ public SinkApiV3Resource() {
this.sink = new SinkImpl(this);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
similarity index 98%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
rename to pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
index bb49fe1..8675cc5 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3Resource.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiResponse;
@@ -41,11 +41,11 @@ import java.util.List;
@Slf4j
@Path("/source")
-public class SourceApiV2Resource extends FunctionApiResource {
+public class SourceApiV3Resource extends FunctionApiResource {
protected final SourceImpl source;
- public SourceApiV2Resource() {
+ public SourceApiV3Resource() {
this.source = new SourceImpl(this);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
index 25a3a66..aa22650 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
@@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImplV2;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -75,6 +76,7 @@ import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
+import static org.apache.pulsar.functions.utils.Utils.mergeJson;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
@@ -131,7 +133,7 @@ public class FunctionApiV2ResourceTest {
private FunctionRuntimeManager mockedFunctionRunTimeManager;
private RuntimeFactory mockedRuntimeFactory;
private Namespace mockedNamespace;
- private FunctionsImpl resource;
+ private FunctionsImplV2 resource;
private InputStream mockedInputStream;
private FormDataContentDisposition mockedFormData;
private FunctionMetaData mockedFunctionMetadata;
@@ -167,16 +169,20 @@ public class FunctionApiV2ResourceTest {
// worker config
WorkerConfig workerConfig = new WorkerConfig()
- .setWorkerId("test")
- .setWorkerPort(8080)
- .setDownloadDirectory("/tmp/pulsar/functions")
- .setFunctionMetadataTopicName("pulsar/functions")
- .setNumFunctionPackageReplicas(3)
- .setPulsarServiceUrl("pulsar://localhost:6650/");
+ .setWorkerId("test")
+ .setWorkerPort(8080)
+ .setDownloadDirectory("/tmp/pulsar/functions")
+ .setFunctionMetadataTopicName("pulsar/functions")
+ .setNumFunctionPackageReplicas(3)
+ .setPulsarServiceUrl("pulsar://localhost:6650/");
when(mockedWorkerService.getWorkerConfig()).thenReturn(workerConfig);
- this.resource = spy(new FunctionsImpl(() -> mockedWorkerService));
- doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(any());
+ FunctionsImpl functions = spy(new FunctionsImpl(() -> mockedWorkerService));
+
+ doReturn(ComponentImpl.ComponentType.FUNCTION).when(functions).calculateSubjectType(any());
+
+ this.resource = spy(new FunctionsImplV2(functions));
+
}
//
@@ -208,16 +214,16 @@ public class FunctionApiV2ResourceTest {
public void testRegisterFunctionMissingNamespace() {
try {
testRegisterFunctionMissingArguments(
- tenant,
- null,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ tenant,
+ null,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -228,22 +234,22 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
public void testRegisterFunctionMissingFunctionName() {
try {
- testRegisterFunctionMissingArguments(
- tenant,
- namespace,
- null,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
- outputSerdeClassName,
- className,
- parallelism,
- null);
- } catch (RestException re){
- assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
- throw re;
- }
+ testRegisterFunctionMissingArguments(
+ tenant,
+ namespace,
+ null,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
+ outputSerdeClassName,
+ className,
+ parallelism,
+ null);
+ } catch (RestException re){
+ assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
+ throw re;
+ }
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Package is not provided")
@@ -292,16 +298,16 @@ public class FunctionApiV2ResourceTest {
public void testRegisterFunctionMissingPackageDetails() {
try {
testRegisterFunctionMissingArguments(
- tenant,
- namespace,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- null,
- outputTopic,
+ tenant,
+ namespace,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ null,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -355,17 +361,17 @@ public class FunctionApiV2ResourceTest {
public void testRegisterFunctionWrongParallelism() {
try {
testRegisterFunctionMissingArguments(
- tenant,
- namespace,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
- outputSerdeClassName,
- className,
- -2,
- null);
+ tenant,
+ namespace,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
+ outputSerdeClassName,
+ className,
+ -2,
+ null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -398,17 +404,17 @@ public class FunctionApiV2ResourceTest {
public void testRegisterFunctionWrongOutputTopic() {
try {
testRegisterFunctionMissingArguments(
- tenant,
- namespace,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- function + "-output-topic/test:",
- outputSerdeClassName,
- className,
- parallelism,
- null);
+ tenant,
+ namespace,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ function + "-output-topic/test:",
+ outputSerdeClassName,
+ className,
+ parallelism,
+ null);
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -419,17 +425,17 @@ public class FunctionApiV2ResourceTest {
public void testRegisterFunctionHttpUrl() {
try {
testRegisterFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- null,
- outputTopic,
- outputSerdeClassName,
- className,
- parallelism,
- "http://localhost:1234/test");
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ null,
+ outputTopic,
+ outputSerdeClassName,
+ className,
+ parallelism,
+ "http://localhost:1234/test");
} catch (RestException re){
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -491,14 +497,14 @@ public class FunctionApiV2ResourceTest {
private void registerDefaultFunction() {
FunctionConfig functionConfig = createDefaultFunctionConfig();
resource.registerFunction(
- tenant,
- namespace,
- function,
- mockedInputStream,
- mockedFormData,
- null,
- null,
- new Gson().toJson(functionConfig),
+ tenant,
+ namespace,
+ function,
+ mockedInputStream,
+ mockedFormData,
+ null,
+ null,
+ new Gson().toJson(functionConfig),
null);
}
@@ -523,8 +529,8 @@ public class FunctionApiV2ResourceTest {
Utils.uploadFileToBookkeeper(
anyString(),
- any(File.class),
- any(Namespace.class));
+ any(File.class),
+ any(Namespace.class));
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
registerDefaultFunction();
@@ -540,15 +546,15 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
+ .setSuccess(true)
+ .setMessage("function registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
@@ -587,15 +593,15 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
+ .setSuccess(false)
+ .setMessage("function failed to register");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
@@ -612,14 +618,14 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
+ new IOException("Function registeration interrupted"));
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
registerDefaultFunction();
@@ -637,16 +643,16 @@ public class FunctionApiV2ResourceTest {
public void testUpdateFunctionMissingTenant() {
try {
testUpdateFunctionMissingArguments(
- null,
- namespace,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ null,
+ namespace,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
"Tenant is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -658,16 +664,16 @@ public class FunctionApiV2ResourceTest {
public void testUpdateFunctionMissingNamespace() {
try {
testUpdateFunctionMissingArguments(
- tenant,
- null,
- function,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ tenant,
+ null,
+ function,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
"Namespace is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -679,16 +685,16 @@ public class FunctionApiV2ResourceTest {
public void testUpdateFunctionMissingFunctionName() {
try {
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- null,
- mockedInputStream,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ tenant,
+ namespace,
+ null,
+ mockedInputStream,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
"Function Name is not provided");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -703,16 +709,16 @@ public class FunctionApiV2ResourceTest {
doNothing().when(Utils.class);
Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- className,
- parallelism,
+ className,
+ parallelism,
"Update contains no change");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -751,16 +757,16 @@ public class FunctionApiV2ResourceTest {
doNothing().when(Utils.class);
Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
outputSerdeClassName,
- null,
- parallelism,
+ null,
+ parallelism,
"Update contains no change");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -775,17 +781,17 @@ public class FunctionApiV2ResourceTest {
doNothing().when(Utils.class);
Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- outputTopic,
- outputSerdeClassName,
- null,
- parallelism + 1,
- null);
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ outputTopic,
+ outputSerdeClassName,
+ null,
+ parallelism + 1,
+ null);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -799,17 +805,17 @@ public class FunctionApiV2ResourceTest {
doNothing().when(Utils.class);
Utils.downloadFromBookkeeper(any(Namespace.class), any(File.class), anyString());
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- topicsToSerDeClassName,
- mockedFormData,
- "DifferentOutput",
- outputSerdeClassName,
- null,
- parallelism,
- "Output topics differ");
+ tenant,
+ namespace,
+ function,
+ null,
+ topicsToSerDeClassName,
+ mockedFormData,
+ "DifferentOutput",
+ outputSerdeClassName,
+ null,
+ parallelism,
+ "Output topics differ");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -825,17 +831,17 @@ public class FunctionApiV2ResourceTest {
Map<String, String> someOtherInput = new HashMap<>();
someOtherInput.put("DifferentTopic", TopicSchema.DEFAULT_SERDE);
testUpdateFunctionMissingArguments(
- tenant,
- namespace,
- function,
- null,
- someOtherInput,
- mockedFormData,
- outputTopic,
- outputSerdeClassName,
- null,
- parallelism,
- "Input Topics cannot be altered");
+ tenant,
+ namespace,
+ function,
+ null,
+ someOtherInput,
+ mockedFormData,
+ outputTopic,
+ outputSerdeClassName,
+ null,
+ parallelism,
+ "Input Topics cannot be altered");
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
throw re;
@@ -892,14 +898,14 @@ public class FunctionApiV2ResourceTest {
}
resource.updateFunction(
- tenant,
- namespace,
- function,
- inputStream,
- details,
- null,
- null,
- new Gson().toJson(functionConfig),
+ tenant,
+ namespace,
+ function,
+ inputStream,
+ details,
+ null,
+ null,
+ new Gson().toJson(functionConfig),
null);
}
@@ -917,14 +923,14 @@ public class FunctionApiV2ResourceTest {
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
resource.updateFunction(
- tenant,
- namespace,
- function,
- mockedInputStream,
- mockedFormData,
- null,
- null,
- new Gson().toJson(functionConfig),
+ tenant,
+ namespace,
+ function,
+ mockedInputStream,
+ mockedFormData,
+ null,
+ null,
+ new Gson().toJson(functionConfig),
null);
}
@@ -946,8 +952,8 @@ public class FunctionApiV2ResourceTest {
doThrow(new IOException("upload failure")).when(Utils.class);
Utils.uploadFileToBookkeeper(
anyString(),
- any(File.class),
- any(Namespace.class));
+ any(File.class),
+ any(Namespace.class));
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
@@ -963,15 +969,15 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function registered");
+ .setSuccess(true)
+ .setMessage("function registered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
@@ -1000,18 +1006,18 @@ public class FunctionApiV2ResourceTest {
RequestResult rr = new RequestResult()
.setSuccess(true)
.setMessage("function registered");
- CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
- when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
+ CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
+ when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
resource.updateFunction(
- tenant,
- namespace,
- function,
- null,
- null,
- filePackageUrl,
- null,
- new Gson().toJson(functionConfig),
+ tenant,
+ namespace,
+ function,
+ null,
+ null,
+ filePackageUrl,
+ null,
+ new Gson().toJson(functionConfig),
null);
}
@@ -1022,15 +1028,15 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to register");
+ .setSuccess(false)
+ .setMessage("function failed to register");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
@@ -1047,14 +1053,14 @@ public class FunctionApiV2ResourceTest {
mockStatic(Utils.class);
doNothing().when(Utils.class);
Utils.uploadToBookeeper(
- any(Namespace.class),
- any(InputStream.class),
- anyString());
+ any(Namespace.class),
+ any(InputStream.class),
+ anyString());
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
CompletableFuture<RequestResult> requestResult = FutureUtil.failedFuture(
- new IOException("Function registeration interrupted"));
+ new IOException("Function registeration interrupted"));
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
updateDefaultFunction();
@@ -1073,9 +1079,9 @@ public class FunctionApiV2ResourceTest {
try {
testDeregisterFunctionMissingArguments(
- null,
- namespace,
- function
+ null,
+ namespace,
+ function
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1087,9 +1093,9 @@ public class FunctionApiV2ResourceTest {
public void testDeregisterFunctionMissingNamespace() {
try {
testDeregisterFunctionMissingArguments(
- tenant,
- null,
- function
+ tenant,
+ null,
+ function
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1100,10 +1106,10 @@ public class FunctionApiV2ResourceTest {
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
public void testDeregisterFunctionMissingFunctionName() {
try {
- testDeregisterFunctionMissingArguments(
- tenant,
- namespace,
- null
+ testDeregisterFunctionMissingArguments(
+ tenant,
+ namespace,
+ null
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1117,17 +1123,17 @@ public class FunctionApiV2ResourceTest {
String function
) {
resource.deregisterFunction(
- tenant,
- namespace,
- function,
+ tenant,
+ namespace,
+ function,
null);
}
private void deregisterDefaultFunction() {
resource.deregisterFunction(
- tenant,
- namespace,
- function,
+ tenant,
+ namespace,
+ function,
null);
}
@@ -1147,8 +1153,8 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
- .setSuccess(true)
- .setMessage("function deregistered");
+ .setSuccess(true)
+ .setMessage("function deregistered");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
@@ -1161,8 +1167,8 @@ public class FunctionApiV2ResourceTest {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
RequestResult rr = new RequestResult()
- .setSuccess(false)
- .setMessage("function failed to deregister");
+ .setSuccess(false)
+ .setMessage("function failed to deregister");
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.deregisterFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(requestResult);
@@ -1195,12 +1201,12 @@ public class FunctionApiV2ResourceTest {
//
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant is not provided")
- public void testGetFunctionMissingTenant() {
+ public void testGetFunctionMissingTenant() throws IOException {
try {
testGetFunctionMissingArguments(
- null,
- namespace,
- function
+ null,
+ namespace,
+ function
);
}
catch (RestException re) {
@@ -1210,12 +1216,12 @@ public class FunctionApiV2ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace is not provided")
- public void testGetFunctionMissingNamespace() {
+ public void testGetFunctionMissingNamespace() throws IOException {
try {
testGetFunctionMissingArguments(
- tenant,
- null,
- function
+ tenant,
+ null,
+ function
);
}
catch (RestException re) {
@@ -1225,12 +1231,12 @@ public class FunctionApiV2ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function Name is not provided")
- public void testGetFunctionMissingFunctionName() {
+ public void testGetFunctionMissingFunctionName() throws IOException {
try {
testGetFunctionMissingArguments(
- tenant,
- namespace,
- null
+ tenant,
+ namespace,
+ null
);
}
catch (RestException re) {
@@ -1243,25 +1249,28 @@ public class FunctionApiV2ResourceTest {
String tenant,
String namespace,
String function
- ) {
+ ) throws IOException {
resource.getFunctionInfo(
- tenant,
- namespace,
- function
+ tenant,
+ namespace,
+ function
);
}
- private FunctionConfig getDefaultFunctionInfo() {
- return resource.getFunctionInfo(
- tenant,
- namespace,
- function
- );
+ private FunctionDetails getDefaultFunctionInfo() throws IOException {
+ String json = (String) resource.getFunctionInfo(
+ tenant,
+ namespace,
+ function
+ ).getEntity();
+ FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ mergeJson(json, functionDetailsBuilder);
+ return functionDetailsBuilder.build();
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Function test-function doesn't exist")
- public void testGetNotExistedFunction() {
+ public void testGetNotExistedFunction() throws IOException {
try {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(false);
getDefaultFunctionInfo();
@@ -1272,7 +1281,7 @@ public class FunctionApiV2ResourceTest {
}
@Test
- public void testGetFunctionSuccess() {
+ public void testGetFunctionSuccess() throws IOException {
when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(function))).thenReturn(true);
SinkSpec sinkSpec = SinkSpec.newBuilder()
@@ -1289,17 +1298,17 @@ public class FunctionApiV2ResourceTest {
.setSource(SourceSpec.newBuilder().setSubscriptionType(subscriptionType)
.putAllTopicsToSerDeClassName(topicsToSerDeClassName)).build();
FunctionMetaData metaData = FunctionMetaData.newBuilder()
- .setCreateTime(System.currentTimeMillis())
- .setFunctionDetails(functionDetails)
- .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
- .setVersion(1234)
- .build();
+ .setCreateTime(System.currentTimeMillis())
+ .setFunctionDetails(functionDetails)
+ .setPackageLocation(PackageLocationMetaData.newBuilder().setPackagePath("/path/to/package"))
+ .setVersion(1234)
+ .build();
when(mockedManager.getFunctionMetaData(eq(tenant), eq(namespace), eq(function))).thenReturn(metaData);
- FunctionConfig functionConfig = getDefaultFunctionInfo();
+ FunctionDetails actual = getDefaultFunctionInfo();
assertEquals(
- FunctionConfigUtils.convertFromDetails(functionDetails),
- functionConfig);
+ functionDetails,
+ actual);
}
//
@@ -1310,8 +1319,8 @@ public class FunctionApiV2ResourceTest {
public void testListFunctionsMissingTenant() {
try {
testListFunctionsMissingArguments(
- null,
- namespace
+ null,
+ namespace
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1323,8 +1332,8 @@ public class FunctionApiV2ResourceTest {
public void testListFunctionsMissingNamespace() {
try {
testListFunctionsMissingArguments(
- tenant,
- null
+ tenant,
+ null
);
} catch (RestException re) {
assertEquals(re.getResponse().getStatusInfo(), Response.Status.BAD_REQUEST);
@@ -1337,17 +1346,17 @@ public class FunctionApiV2ResourceTest {
String namespace
) {
resource.listFunctions(
- tenant,
- namespace
+ tenant,
+ namespace
);
}
private List<String> listDefaultFunctions() {
- return resource.listFunctions(
- tenant,
- namespace
- );
+ return new Gson().fromJson((String) resource.listFunctions(
+ tenant,
+ namespace
+ ).getEntity(), List.class);
}
@Test
@@ -1369,33 +1378,11 @@ public class FunctionApiV2ResourceTest {
}
@Test
- public void testOnlyGetSources() {
- List<String> functions = Lists.newArrayList("test-2");
- List<FunctionMetaData> functionMetaDataList = new LinkedList<>();
- FunctionMetaData f1 = FunctionMetaData.newBuilder().setFunctionDetails(
- FunctionDetails.newBuilder().setName("test-1").build()).build();
- functionMetaDataList.add(f1);
- FunctionMetaData f2 = FunctionMetaData.newBuilder().setFunctionDetails(
- FunctionDetails.newBuilder().setName("test-2").build()).build();
- functionMetaDataList.add(f2);
- FunctionMetaData f3 = FunctionMetaData.newBuilder().setFunctionDetails(
- FunctionDetails.newBuilder().setName("test-3").build()).build();
- functionMetaDataList.add(f3);
- when(mockedManager.listFunctions(eq(tenant), eq(namespace))).thenReturn(functionMetaDataList);
- doReturn(ComponentImpl.ComponentType.SOURCE).when(this.resource).calculateSubjectType(f1);
- doReturn(ComponentImpl.ComponentType.FUNCTION).when(this.resource).calculateSubjectType(f2);
- doReturn(ComponentImpl.ComponentType.SINK).when(this.resource).calculateSubjectType(f3);
-
- List<String> functionList = listDefaultFunctions();
- assertEquals(functions, functionList);
- }
-
- @Test
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionsImpl function = new FunctionsImpl(null);
- StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
+ FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+ StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction(jarHttpUrl).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
@@ -1409,8 +1396,8 @@ public class FunctionApiV2ResourceTest {
public void testDownloadFunctionFile() throws Exception {
String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- FunctionsImpl function = new FunctionsImpl(null);
- StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
+ FunctionsImplV2 function = new FunctionsImplV2(() -> mockedWorkerService);
+ StreamingOutput streamOutput = (StreamingOutput) function.downloadFunction("file://" + fileLocation).getEntity();
File pkgFile = new File(testDir, UUID.randomUUID().toString());
OutputStream output = new FileOutputStream(pkgFile);
streamOutput.write(output);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
similarity index 99%
copy from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
copy to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 25a3a66..580345e 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
@@ -52,6 +52,7 @@ import org.apache.pulsar.functions.worker.request.RequestResult;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -93,7 +94,7 @@ import static org.testng.Assert.assertEquals;
@PrepareForTest(Utils.class)
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*" })
@Slf4j
-public class FunctionApiV2ResourceTest {
+public class FunctionApiV3ResourceTest {
@ObjectFactory
public IObjectFactory getObjectFactory() {
@@ -1393,7 +1394,7 @@ public class FunctionApiV2ResourceTest {
@Test
public void testDownloadFunctionHttpUrl() throws Exception {
String jarHttpUrl = "http://central.maven.org/maven2/org/apache/pulsar/pulsar-common/1.22.0-incubating/pulsar-common-1.22.0-incubating.jar";
- String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImpl function = new FunctionsImpl(null);
StreamingOutput streamOutput = function.downloadFunction(jarHttpUrl);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
@@ -1408,7 +1409,7 @@ public class FunctionApiV2ResourceTest {
@Test
public void testDownloadFunctionFile() throws Exception {
String fileLocation = FutureUtil.class.getProtectionDomain().getCodeSource().getLocation().getPath();
- String testDir = FunctionApiV2ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
+ String testDir = FunctionApiV3ResourceTest.class.getProtectionDomain().getCodeSource().getLocation().getPath();
FunctionsImpl function = new FunctionsImpl(null);
StreamingOutput streamOutput = function.downloadFunction("file://" + fileLocation);
File pkgFile = new File(testDir, UUID.randomUUID().toString());
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index ee9a014..bec3ca9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
@@ -87,12 +87,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
/**
- * Unit test of {@link SinkApiV2Resource}.
+ * Unit test of {@link SinkApiV3Resource}.
*/
@PrepareForTest({Utils.class, SinkConfigUtils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
@Slf4j
-public class SinkApiV2ResourceTest {
+public class SinkApiV3ResourceTest {
@ObjectFactory
public IObjectFactory getObjectFactory() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
similarity index 99%
rename from pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
rename to pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 194f624..72d23e2 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.worker.rest.api.v2;
+package org.apache.pulsar.functions.worker.rest.api.v3;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
@@ -86,12 +86,12 @@ import static org.powermock.api.mockito.PowerMockito.mockStatic;
import static org.testng.Assert.assertEquals;
/**
- * Unit test of {@link SourceApiV2Resource}.
+ * Unit test of {@link SourceApiV3Resource}.
*/
@PrepareForTest({Utils.class, ConnectorUtils.class, org.apache.pulsar.functions.utils.Utils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
@Slf4j
-public class SourceApiV2ResourceTest {
+public class SourceApiV3ResourceTest {
@ObjectFactory
public IObjectFactory getObjectFactory() {