You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/08 22:03:07 UTC
[pulsar] 01/13: First cut of adding new endpoints for source/sink
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ee81c7e7ad6a7b4f466ba4abf478545e6d3bccf2
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Thu Oct 4 16:56:26 2018 -0700
First cut of adding new endpoints for source/sink
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 4 +-
.../apache/pulsar/broker/admin/impl/SinkBase.java | 258 ++++++++++++++++++++
.../pulsar/broker/admin/impl/SourceBase.java | 261 +++++++++++++++++++++
.../org/apache/pulsar/broker/admin/v1/Sink.java | 29 +++
.../org/apache/pulsar/broker/admin/v1/Source.java | 29 +++
.../org/apache/pulsar/broker/admin/v2/Sink.java | 34 +++
.../org/apache/pulsar/broker/admin/v2/Source.java | 34 +++
.../runtime/KubernetesRuntimeFactory.java | 3 +-
.../src/main/resources/java_instance_log4j2.yml | 4 +-
.../pulsar/functions/worker/rest/Resources.java | 4 +
.../functions/worker/rest/api/FunctionsImpl.java | 64 +++--
.../worker/rest/api/v2/FunctionApiV2Resource.java | 4 +-
.../worker/rest/api/v2/SinkApiV2Resource.java | 180 ++++++++++++++
.../worker/rest/api/v2/SourceApiV2Resource.java | 180 ++++++++++++++
14 files changed, 1060 insertions(+), 28 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index cff3c34..d20bc62 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -84,7 +84,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
}
@PUT
@@ -106,7 +106,7 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
new file mode 100644
index 0000000..43ca8bf
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinkBase.java
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SinkBase extends AdminResource implements Supplier<WorkerService> {
+
+ private final FunctionsImpl functions;
+
+ public SinkBase() {
+ this.functions = new FunctionsImpl(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
+ }
+
+ @POST
+ @ApiOperation(value = "Creates a new Pulsar Sink in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+ return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ }
+
+ @PUT
+ @ApiOperation(value = "Updates a Pulsar Sink currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+ return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+ }
+
+
+ @DELETE
+ @ApiOperation(value = "Deletes a Pulsar Sink currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function doesn't exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "The function was successfully deleted")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ public Response deregisterSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) {
+ return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about a Pulsar Sink currently running in cluster mode",
+ response = FunctionMetaData.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ public Response getSinkInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) throws IOException {
+ return functions.getFunctionInfo(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Sink instance",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Sink running in cluster mode",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}/{sinkName}/status")
+ public Response getSinkStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) throws IOException {
+ return functions.getFunctionStatus(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Lists all Pulsar Sinks currently deployed in a given namespace",
+ response = String.class,
+ responseContainer = "Collection"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}")
+ public Response listSinks(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(tenant, namespace);
+
+ }
+
+ @POST
+ @ApiOperation(value = "Restart sink instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all sink instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+ return functions.restartFunctionInstances(tenant, namespace, sinkName);
+ }
+
+ @POST
+ @ApiOperation(value = "Stop sink instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Stop all sink instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+ return functions.stopFunctionInstances(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches a list of supported Pulsar IO sink connectors currently running in cluster mode",
+ response = List.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/builtinsinks")
+ public List<ConnectorDefinition> getSinkList() {
+ List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
new file mode 100644
index 0000000..b4428ab
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourceBase.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.impl;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.broker.admin.AdminResource;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
+import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.rest.api.FunctionsImpl;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+public class SourceBase extends AdminResource implements Supplier<WorkerService> {
+
+ private final FunctionsImpl functions;
+
+ public SourceBase() {
+ this.functions = new FunctionsImpl(this);
+ }
+
+ @Override
+ public WorkerService get() {
+ return pulsar().getWorkerService();
+ }
+
+ @POST
+ @ApiOperation(value = "Creates a new Pulsar Source in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function already exists, etc.)"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully created")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+ return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ }
+
+ @PUT
+ @ApiOperation(value = "Updates a Pulsar Source currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request (function doesn't exist, etc.)"),
+ @ApiResponse(code = 200, message = "Pulsar Function successfully updated")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+ return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+ }
+
+
+ @DELETE
+ @ApiOperation(value = "Deletes a Pulsar Source currently running in cluster mode")
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function doesn't exist"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 200, message = "The function was successfully deleted")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ public Response deregisterSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) {
+ return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches information about a Pulsar Source currently running in cluster mode",
+ response = FunctionMetaData.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ public Response getSourceInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) throws IOException {
+ return functions.getFunctionInfo(
+ tenant, namespace, sourceName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Source instance",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 404, message = "The function doesn't exist")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Displays the status of a Pulsar Source running in cluster mode",
+ response = FunctionStatus.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}/{sourceName}/status")
+ public Response getSourceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) throws IOException {
+ return functions.getFunctionStatus(tenant, namespace, sourceName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Lists all Pulsar Sources currently deployed in a given namespace",
+ response = String.class,
+ responseContainer = "Collection"
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions")
+ })
+ @Path("/{tenant}/{namespace}")
+ public Response listSources(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(
+ tenant, namespace);
+
+ }
+
+ @POST
+ @ApiOperation(value = "Restart source instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all source instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+ return functions.restartFunctionInstances(tenant, namespace, sourceName);
+ }
+
+ @POST
+ @ApiOperation(value = "Stop source instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Stop all source instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+ return functions.stopFunctionInstances(tenant, namespace, sourceName);
+ }
+
+ @GET
+ @ApiOperation(
+ value = "Fetches a list of supported Pulsar IO source connectors currently running in cluster mode",
+ response = List.class
+ )
+ @ApiResponses(value = {
+ @ApiResponse(code = 403, message = "The requester doesn't have admin permissions"),
+ @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 408, message = "Request timeout")
+ })
+ @Path("/builtinsources")
+ public List<ConnectorDefinition> getSourceList() {
+ List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java
new file mode 100644
index 0000000..3d73d4d
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Sink.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v1;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SinkBase;
+
+import javax.ws.rs.Path;
+
+@Path("/sink")
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = true)
+public class Sink extends SinkBase {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java
new file mode 100644
index 0000000..99b41dc
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Source.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v1;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SourceBase;
+
+import javax.ws.rs.Path;
+
+@Path("/source")
+@Api(value = "/source", description = "Source admin apis", tags = "source", hidden = true)
+public class Source extends SourceBase {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
new file mode 100644
index 0000000..aea0ae7
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Sink.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SinkBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/sink")
+@Api(value = "/sink", description = "Sink admin apis", tags = "sink", hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Sink extends SinkBase {
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
new file mode 100644
index 0000000..e5ef56c
--- /dev/null
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Source.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.admin.v2;
+
+import io.swagger.annotations.Api;
+import org.apache.pulsar.broker.admin.impl.SourceBase;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+
+@Path("/source")
+@Api(value = "/source", description = "Source admin apis", tags = "source", hidden = true)
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class Source extends SourceBase {
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index 8ba7505..cba9ebf 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -141,7 +141,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
KubernetesRuntime.doChecks(functionDetails);
}
- private void setupClient() throws Exception {
+ @VisibleForTesting
+ void setupClient() throws Exception {
if (appsClient == null) {
if (k8Uri == null) {
log.info("k8Uri is null thus going by defaults");
diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
index df25367..9846f05 100644
--- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
+++ b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
@@ -44,7 +44,7 @@ Configuration:
name: RollingFile
fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
- immediateFlush: false
+ immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
@@ -72,7 +72,7 @@ Configuration:
name: BkRollingFile
fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
- immediateFlush: false
+ immediateFlush: true
PatternLayout:
Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
Policies:
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
index 26c7127..ac011db 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/Resources.java
@@ -20,6 +20,8 @@ package org.apache.pulsar.functions.worker.rest;
import org.apache.pulsar.functions.worker.rest.api.FunctionsMetricsResource;
import org.apache.pulsar.functions.worker.rest.api.v2.FunctionApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SinkApiV2Resource;
+import org.apache.pulsar.functions.worker.rest.api.v2.SourceApiV2Resource;
import org.apache.pulsar.functions.worker.rest.api.v2.WorkerApiV2Resource;
import org.glassfish.jersey.media.multipart.MultiPartFeature;
@@ -36,6 +38,8 @@ public final class Resources {
return new HashSet<>(
Arrays.asList(
FunctionApiV2Resource.class,
+ SourceApiV2Resource.class,
+ SinkApiV2Resource.class,
WorkerApiV2Resource.class,
MultiPartFeature.class
));
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 3124731..0d30e37 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -27,7 +27,6 @@ import static org.apache.pulsar.functions.utils.Reflections.loadJar;
import static org.apache.pulsar.functions.utils.Utils.FILE;
import static org.apache.pulsar.functions.utils.Utils.HTTP;
import static org.apache.pulsar.functions.utils.Utils.isFunctionPackageUrlSupported;
-import static org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders.create;
import com.google.gson.Gson;
@@ -39,10 +38,7 @@ import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
-import java.net.URLClassLoader;
-import java.nio.file.CopyOption;
import java.nio.file.Files;
-import java.nio.file.NoSuchFileException;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
@@ -89,9 +85,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
-import org.apache.pulsar.functions.utils.FunctionConfig;
-import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.functioncache.FunctionClassLoaders;
+import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.utils.validation.ConfigValidation;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -136,6 +130,7 @@ public class FunctionsImpl {
public Response registerFunction(final String tenant, final String namespace, final String functionName,
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
+ final String sourceConfigJson, final String sinkConfigJson,
final String clientRole) {
if (!isWorkerServiceAvailable()) {
@@ -173,10 +168,10 @@ public class FunctionsImpl {
try {
if (isPkgUrlProvided) {
functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
- functionDetailsJson, functionConfigJson);
+ functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
} else {
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
- fileDetail, functionDetailsJson, functionConfigJson);
+ fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
}
} catch (Exception e) {
log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -216,7 +211,7 @@ public class FunctionsImpl {
public Response updateFunction(final String tenant, final String namespace, final String functionName,
final InputStream uploadedInputStream, final FormDataContentDisposition fileDetail,
final String functionPkgUrl, final String functionDetailsJson, final String functionConfigJson,
- final String clientRole) {
+ final String sourceConfigJson, final String sinkConfigJson, final String clientRole) {
if (!isWorkerServiceAvailable()) {
return getUnavailableResponse();
@@ -252,10 +247,10 @@ public class FunctionsImpl {
try {
if (isPkgUrlProvided) {
functionDetails = validateUpdateRequestParamsWithPkgUrl(tenant, namespace, functionName, functionPkgUrl,
- functionDetailsJson, functionConfigJson);
+ functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
} else {
functionDetails = validateUpdateRequestParams(tenant, namespace, functionName, uploadedInputStreamAsFile,
- fileDetail, functionDetailsJson, functionConfigJson);
+ fileDetail, functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson);
}
} catch (Exception e) {
log.error("Invalid register function request @ /{}/{}/{}", tenant, namespace, functionName, e);
@@ -877,23 +872,24 @@ public class FunctionsImpl {
}
private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, String namespace, String functionName,
- String functionPkgUrl, String functionDetailsJson, String functionConfigJson)
+ String functionPkgUrl, String functionDetailsJson, String functionConfigJson,
+ String sourceConfigJson, String sinkConfigJson)
throws IllegalArgumentException, IOException, URISyntaxException {
if (!isFunctionPackageUrlSupported(functionPkgUrl)) {
throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
}
FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- functionDetailsJson, functionConfigJson, functionPkgUrl, null);
+ functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, functionPkgUrl, null);
return functionDetails;
}
private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
- String functionConfigJson)
+ String functionConfigJson, String sourceConfigJson, String sinkConfigJson)
throws IllegalArgumentException, IOException, URISyntaxException {
FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
- functionDetailsJson, functionConfigJson, null, uploadedInputStreamAsFile);
+ functionDetailsJson, functionConfigJson, sourceConfigJson, sinkConfigJson, null, uploadedInputStreamAsFile);
if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
throw new IllegalArgumentException("Function Package is not provided");
}
@@ -966,7 +962,8 @@ public class FunctionsImpl {
}
private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String functionName,
- String functionDetailsJson, String functionConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
+ String functionDetailsJson, String functionConfigJson, String sourceConfigJson,
+ String sinkConfigJson, String functionPkgUrl, File uploadedInputStreamAsFile) throws IOException, URISyntaxException {
if (tenant == null) {
throw new IllegalArgumentException("Tenant is not provided");
}
@@ -977,11 +974,24 @@ public class FunctionsImpl {
throw new IllegalArgumentException("Function Name is not provided");
}
- if (StringUtils.isEmpty(functionDetailsJson) && StringUtils.isEmpty(functionConfigJson)) {
- throw new IllegalArgumentException("FunctionConfig is not provided");
+ int numDefinitions = 0;
+ if (StringUtils.isEmpty(functionDetailsJson)) {
+ numDefinitions++;
+ }
+ if (StringUtils.isEmpty(functionConfigJson)) {
+ numDefinitions++;
+ }
+ if (StringUtils.isEmpty(sourceConfigJson)) {
+ numDefinitions++;
}
- if (!StringUtils.isEmpty(functionDetailsJson) && !StringUtils.isEmpty(functionConfigJson)) {
- throw new IllegalArgumentException("Only one of FunctionDetails or FunctionConfig should be provided");
+ if (StringUtils.isEmpty(sinkConfigJson)) {
+ numDefinitions++;
+ }
+ if (numDefinitions == 0) {
+ throw new IllegalArgumentException("Function Info is not provided");
+ }
+ if (numDefinitions > 1) {
+ throw new IllegalArgumentException("Conflicting Info provided");
}
if (!StringUtils.isEmpty(functionConfigJson)) {
FunctionConfig functionConfig = new Gson().fromJson(functionConfigJson, FunctionConfig.class);
@@ -995,6 +1005,18 @@ public class FunctionsImpl {
ConfigValidation.validateConfig(functionConfig, functionConfig.getRuntime().name(), clsLoader);
return FunctionConfigUtils.convert(functionConfig, clsLoader);
}
+ if (!StringUtils.isEmpty(sourceConfigJson)) {
+ SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
+ ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+ ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+ return SourceConfigUtils.convert(sourceConfig, clsLoader);
+ }
+ if (!StringUtils.isEmpty(sinkConfigJson)) {
+ SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
+ ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+ ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
+ return SinkConfigUtils.convert(sinkConfig, clsLoader);
+ }
FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
if (isNotBlank(functionPkgUrl)) {
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
index 1e44a60..a842b9e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/FunctionApiV2Resource.java
@@ -60,7 +60,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
}
@@ -77,7 +77,7 @@ public class FunctionApiV2Resource extends FunctionApiResource {
final @FormDataParam("functionConfig") String functionConfigJson) {
return functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionDetailsJson, functionConfigJson, clientAppId());
+ functionPkgUrl, functionDetailsJson, functionConfigJson, null, null, clientAppId());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
new file mode 100644
index 0000000..175e2eb
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/source")
+public class SinkApiV2Resource extends FunctionApiResource {
+
+ @POST
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String sourcePkgUrl,
+ final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+ return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+ }
+
+ @PUT
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sinkConfig") String sinkConfigJson) {
+
+ return functions.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
+
+ }
+
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ public Response deregisterSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+ return functions.deregisterFunction(tenant, namespace, sinkName, clientAppId());
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sinkName}")
+ public Response getSinkInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName)
+ throws IOException {
+ return functions.getFunctionInfo(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/status")
+ public Response getSinkInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, sinkName, instanceId, uri.getRequestUri());
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sinkName}/status")
+ public Response getSinkStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sinkName") String sinkName) throws IOException {
+ return functions.getFunctionStatus(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}")
+ public Response listSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(tenant, namespace);
+
+ }
+
+ @POST
+ @ApiOperation(value = "Restart sink instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all sink instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+ return functions.restartFunctionInstances(tenant, namespace, sinkName);
+ }
+
+ @POST
+ @ApiOperation(value = "Stop sink instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/{instanceId}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, sinkName, instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Stop all sink instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sinkName}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSink(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sinkName") String sinkName) {
+ return functions.stopFunctionInstances(tenant, namespace, sinkName);
+ }
+
+ @GET
+ @Path("/builtinsinks")
+ public List<ConnectorDefinition> getSinkList() {
+ List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!StringUtils.isEmpty(connectorDefinition.getSinkClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
new file mode 100644
index 0000000..9d25bf9
--- /dev/null
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.worker.rest.api.v2;
+
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.functions.worker.rest.FunctionApiResource;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
+import org.glassfish.jersey.media.multipart.FormDataParam;
+
+import javax.ws.rs.*;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+@Slf4j
+@Path("/source")
+public class SourceApiV2Resource extends FunctionApiResource {
+
+ @POST
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response registerSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String sourcePkgUrl,
+ final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+ return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ sourcePkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+ }
+
+ @PUT
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ @Consumes(MediaType.MULTIPART_FORM_DATA)
+ public Response updateSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sourceConfig") String sourceConfigJson) {
+
+ return functions.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
+
+ }
+
+
+ @DELETE
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ public Response deregisterSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+ return functions.deregisterFunction(tenant, namespace, sourceName, clientAppId());
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sourceName}")
+ public Response getSourceInfo(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName)
+ throws IOException {
+ return functions.getFunctionInfo(tenant, namespace, sourceName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/status")
+ public Response getSourceInstanceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) throws IOException {
+ return functions.getFunctionInstanceStatus(
+ tenant, namespace, sourceName, instanceId, uri.getRequestUri());
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}/{sourceName}/status")
+ public Response getSourceStatus(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName) throws IOException {
+ return functions.getFunctionStatus(tenant, namespace, sourceName);
+ }
+
+ @GET
+ @Path("/{tenant}/{namespace}")
+ public Response listSources(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace) {
+ return functions.listFunctions(tenant, namespace);
+
+ }
+
+ @POST
+ @ApiOperation(value = "Restart source instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.restartFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Restart all source instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/restart")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response restartSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+ return functions.restartFunctionInstances(tenant, namespace, sourceName);
+ }
+
+ @POST
+ @ApiOperation(value = "Stop source instance", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/{instanceId}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName,
+ final @PathParam("instanceId") String instanceId) {
+ return functions.stopFunctionInstance(tenant, namespace, sourceName, instanceId, this.uri.getRequestUri());
+ }
+
+ @POST
+ @ApiOperation(value = "Stop all source instances", response = Void.class)
+ @ApiResponses(value = { @ApiResponse(code = 400, message = "Invalid request"),
+ @ApiResponse(code = 404, message = "The function does not exist"),
+ @ApiResponse(code = 500, message = "Internal server error") })
+ @Path("/{tenant}/{namespace}/{sourceName}/stop")
+ @Consumes(MediaType.APPLICATION_JSON)
+ public Response stopSource(final @PathParam("tenant") String tenant,
+ final @PathParam("namespace") String namespace, final @PathParam("sourceName") String sourceName) {
+ return functions.stopFunctionInstances(tenant, namespace, sourceName);
+ }
+
+ @GET
+ @Path("/builtinsources")
+ public List<ConnectorDefinition> getSourceList() {
+ List<ConnectorDefinition> connectorDefinitions = functions.getListOfConnectors();
+ List<ConnectorDefinition> retval = new ArrayList<>();
+ for (ConnectorDefinition connectorDefinition : connectorDefinitions) {
+ if (!StringUtils.isEmpty(connectorDefinition.getSourceClass())) {
+ retval.add(connectorDefinition);
+ }
+ }
+ return retval;
+ }
+}