You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/06/24 03:01:43 UTC
[pulsar] branch master updated: Refactoring Function Component
implementation (#4541)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 95df092 Refactoring Function Component implementation (#4541)
95df092 is described below
commit 95df092832a3633a81fd719a5c096919141cc4f9
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun Jun 23 20:01:37 2019 -0700
Refactoring Function Component implementation (#4541)
* Refactoring Function Component implementation
* cleaning up
---
.../pulsar/broker/admin/impl/FunctionsBase.java | 15 +-
.../apache/pulsar/broker/admin/impl/SinksBase.java | 21 +-
.../pulsar/broker/admin/impl/SourcesBase.java | 20 +-
.../functions/worker/rest/api/ComponentImpl.java | 485 +--------------------
.../functions/worker/rest/api/FunctionsImpl.java | 394 ++++++++++++++++-
.../functions/worker/rest/api/FunctionsImplV2.java | 19 +-
.../functions/worker/rest/api/SinksImpl.java | 403 ++++++++++++++++-
.../functions/worker/rest/api/SourcesImpl.java | 401 ++++++++++++++++-
.../worker/rest/api/v3/FunctionsApiV3Resource.java | 8 +-
.../worker/rest/api/v3/SinksApiV3Resource.java | 21 +-
.../worker/rest/api/v3/SourcesApiV3Resource.java | 21 +-
.../rest/api/v3/FunctionApiV3ResourceTest.java | 14 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 28 +-
.../rest/api/v3/SourceApiV3ResourceTest.java | 24 +-
14 files changed, 1290 insertions(+), 584 deletions(-)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
index 11e20de..c6f38d0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/FunctionsBase.java
@@ -18,7 +18,12 @@
*/
package org.apache.pulsar.broker.admin.impl;
-import io.swagger.annotations.*;
+import io.swagger.annotations.ApiOperation;
+import io.swagger.annotations.ApiParam;
+import io.swagger.annotations.ApiResponse;
+import io.swagger.annotations.ApiResponses;
+import io.swagger.annotations.Example;
+import io.swagger.annotations.ExampleProperty;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.functions.FunctionConfig;
@@ -163,10 +168,10 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
)
)
)
- final @FormDataParam("functionConfig") String functionConfigJson) {
+ final @FormDataParam("functionConfig") FunctionConfig functionConfig) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
+ functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -270,12 +275,12 @@ public class FunctionsBase extends AdminResource implements Supplier<WorkerServi
)
)
)
- final @FormDataParam("functionConfig") String functionConfigJson,
+ final @FormDataParam("functionConfig") FunctionConfig functionConfig,
@ApiParam(value = "The update options is for the Pulsar Function that needs to be updated.")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) throws IOException {
functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
index c527bd3..e21d861 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SinksBase.java
@@ -77,7 +77,7 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config playload. All available configuration options are: \n" +
@@ -136,10 +136,9 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
)
)
)
- final @FormDataParam("sinkConfig") String sinkConfigJson) {
-
- sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
+ final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
+ sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -162,7 +161,8 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(value = "URL of sink's archive")
+ final @FormDataParam("url") String sinkPkgUrl,
@ApiParam(
value =
"A JSON value presenting a sink config playload. All available configuration options are: \n" +
@@ -221,12 +221,11 @@ public class SinksBase extends AdminResource implements Supplier<WorkerService>
)
)
)
- final @FormDataParam("sinkConfig") String sinkConfigJson,
- @ApiParam()
+ final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
+ @ApiParam(value = "Update options for sink")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
-
- sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ sinkPkgUrl, sinkConfig, clientAppId(), clientAuthData(), updateOptions);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
index 4856800..8dc20c9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/SourcesBase.java
@@ -79,7 +79,7 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here. \n" +
"classname \n" +
@@ -126,10 +126,9 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
)
)
)
- final @FormDataParam("sourceConfig") String sourceConfigJson) {
-
- source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
+ source.registerSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -154,7 +153,8 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
final @PathParam("sourceName") String sourceName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String functionPkgUrl,
+ @ApiParam(value = "URL of sources' archive")
+ final @FormDataParam("url") String sourcePkgUrl,
@ApiParam(
value = "A JSON value presenting source configuration payload. An example of the expected functions can be found here. \n" +
"classname \n" +
@@ -201,11 +201,11 @@ public class SourcesBase extends AdminResource implements Supplier<WorkerService
)
)
)
- final @FormDataParam("sourceConfig") String sourceConfigJson,
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
+ @ApiParam(value = "Update options for source")
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
-
- source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ source.updateSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ sourcePkgUrl, sourceConfig, clientAppId(), clientAuthData(), updateOptions);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 62ac9bd..345ea56 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -18,9 +18,6 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.google.gson.Gson;
-import com.google.protobuf.ByteString;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
@@ -34,7 +31,6 @@ import org.apache.bookkeeper.clients.exceptions.NamespaceNotFoundException;
import org.apache.bookkeeper.clients.exceptions.StreamNotFoundException;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
@@ -46,19 +42,14 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
-import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
-import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
@@ -68,11 +59,9 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
-import org.apache.pulsar.functions.utils.SinkConfigUtils;
-import org.apache.pulsar.functions.utils.SourceConfigUtils;
-import org.apache.pulsar.functions.utils.ComponentTypeUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.FunctionRuntimeInfo;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
@@ -95,12 +84,10 @@ import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
-import java.nio.file.Path;
import java.util.Base64;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -111,10 +98,8 @@ import static com.google.common.base.Preconditions.checkNotNull;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
-import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
import static org.apache.pulsar.functions.utils.FunctionCommon.getStateNamespace;
import static org.apache.pulsar.functions.utils.FunctionCommon.getUniquePackageName;
import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
@@ -285,170 +270,10 @@ public abstract class ComponentImpl {
return true;
}
- public void registerFunction(final String tenant,
- final String namespace,
- final String componentName,
- final InputStream uploadedInputStream,
- final FormDataContentDisposition fileDetail,
- final String functionPkgUrl,
- final String componentConfigJson,
- final String clientRole,
- AuthenticationDataHttps clientAuthenticationDataHttps) {
-
- if (!isWorkerServiceAvailable()) {
- throwUnavailableException();
- }
-
- if (tenant == null) {
- throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
- }
- if (namespace == null) {
- throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
- }
- if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
- }
-
- try {
- if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
- componentName, clientRole, ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
- }
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- try {
- // Check tenant exists
- final TenantInfo tenantInfo = worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
-
- String qualifiedNamespace = tenant + "/" + namespace;
- List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
- if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
- String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
- worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
- if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
- log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, componentName, namespace);
- throw new RestException(Status.BAD_REQUEST, "Namespace does not exist");
- }
- }
- } catch (PulsarAdminException.NotAuthorizedException e) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
- componentName, clientRole, ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
- } catch (PulsarAdminException.NotFoundException e) {
- log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, componentName, tenant);
- throw new RestException(Status.BAD_REQUEST, "Tenant does not exist");
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- if (functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- FunctionDetails functionDetails = null;
- boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
- File componentPackageFile = null;
- try {
-
- // validate parameters
- try {
- if (isPkgUrlProvided) {
-
- if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
- throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
- }
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- componentConfigJson, componentType, componentPackageFile);
- } else {
- if (uploadedInputStream != null) {
- componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- componentConfigJson, componentType, componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
- }
- }
- } catch (Exception e) {
- log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- try {
- worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
- } catch (Exception e) {
- log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
-
- // function state
- FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder()
- .setFunctionDetails(functionDetails)
- .setCreateTime(System.currentTimeMillis())
- .setVersion(0);
-
- // cache auth if need
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
-
- if (clientAuthenticationDataHttps != null) {
- try {
- Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
- .getRuntimeFactory()
- .getAuthProvider()
- .cacheAuthData(tenant, namespace, componentName, clientAuthenticationDataHttps);
-
- if (functionAuthData.isPresent()) {
- functionMetaDataBuilder.setFunctionAuthSpec(
- Function.FunctionAuthenticationSpec.newBuilder()
- .setData(ByteString.copyFrom(functionAuthData.get().getData()))
- .build());
- }
- } catch (Exception e) {
- log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
-
-
- throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
- }
- }
-
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
- try {
- packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
- functionPkgUrl, fileDetail, componentPackageFile);
- } catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
- updateRequest(functionMetaDataBuilder.build());
- } finally {
-
- if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
- && componentPackageFile != null && componentPackageFile.exists()) {
- componentPackageFile.delete();
- }
- }
- }
-
- private PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
- final String functionPkgUrl,
- final FormDataContentDisposition fileDetail,
- final File uploadedInputStreamAsFile) throws Exception {
+ PackageLocationMetaData.Builder getFunctionPackageLocation(final FunctionMetaData functionMetaData,
+ final String functionPkgUrl,
+ final FormDataContentDisposition fileDetail,
+ final File uploadedInputStreamAsFile) throws Exception {
FunctionDetails functionDetails = functionMetaData.getFunctionDetails();
String tenant = functionDetails.getTenant();
String namespace = functionDetails.getNamespace();
@@ -512,230 +337,6 @@ public abstract class ComponentImpl {
return packageLocationMetaDataBuilder;
}
-
- public void updateFunction(final String tenant,
- final String namespace,
- final String componentName,
- final InputStream uploadedInputStream,
- final FormDataContentDisposition fileDetail,
- final String functionPkgUrl,
- final String componentConfigJson,
- final String clientRole,
- AuthenticationDataHttps clientAuthenticationDataHttps,
- UpdateOptions updateOptions) {
-
- if (!isWorkerServiceAvailable()) {
- throwUnavailableException();
- }
-
- if (tenant == null) {
- throw new RestException(Status.BAD_REQUEST, "Tenant is not provided");
- }
- if (namespace == null) {
- throw new RestException(Status.BAD_REQUEST, "Namespace is not provided");
- }
- if (componentName == null) {
- throw new RestException(Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
- }
-
- try {
- if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
- log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
- componentName, clientRole, ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.UNAUTHORIZED, "client is not authorize to perform operation");
-
- }
- } catch (PulsarAdminException e) {
- log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
-
- FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
-
- if (!functionMetaDataManager.containsFunction(tenant, namespace, componentName)) {
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- String mergedComponentConfigJson;
- String existingComponentConfigJson;
-
- FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, componentName);
-
- if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
- log.error("{}/{}/{} is not a {}", tenant, namespace, componentName, ComponentTypeUtils.toString(componentType));
- throw new RestException(Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), componentName));
- }
-
- if (componentType.equals(FunctionDetails.ComponentType.FUNCTION)) {
- FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new Gson().toJson(existingFunctionConfig);
- FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precedence over whatever is there in functionconfig
- functionConfig.setTenant(tenant);
- functionConfig.setNamespace(namespace);
- functionConfig.setName(componentName);
- try {
- FunctionConfig mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig,
- functionConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- } else if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
- SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new Gson().toJson(existingSourceConfig);
- SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precedence over whatever is there in functionconfig
- sourceConfig.setTenant(tenant);
- sourceConfig.setNamespace(namespace);
- sourceConfig.setName(componentName);
- try {
- SourceConfig mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- } else {
- SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
- existingComponentConfigJson = new Gson().toJson(existingSinkConfig);
- SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
- // The rest end points take precedence over whatever is there in functionconfig
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(componentName);
- try {
- SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
- mergedComponentConfigJson = new Gson().toJson(mergedConfig);
- } catch (Exception e) {
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
- }
-
- if (existingComponentConfigJson.equals(mergedComponentConfigJson) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
- log.error("{}/{}/{} Update contains no changes", tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, "Update contains no change");
- }
-
- FunctionDetails functionDetails = null;
- File componentPackageFile = null;
- try {
-
- // validate parameters
- try {
- if (isNotBlank(functionPkgUrl)) {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- mergedComponentConfigJson, componentType, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
- || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
- try {
- componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
- }
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- mergedComponentConfigJson, componentType, componentPackageFile);
- } else if (uploadedInputStream != null) {
-
- componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- mergedComponentConfigJson, componentType, componentPackageFile);
-
- } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- mergedComponentConfigJson, componentType, componentPackageFile);
- if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
- throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
- }
- } else {
-
- componentPackageFile = FunctionCommon.createPkgTempFile();
- componentPackageFile.deleteOnExit();
- log.info("componentPackageFile: {}", componentPackageFile);
- WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
-
- functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
- mergedComponentConfigJson, componentType, componentPackageFile);
- }
- } catch (Exception e) {
- log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.BAD_REQUEST, e.getMessage());
- }
-
- try {
- worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
- } catch (Exception e) {
- log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName);
- throw new RestException(Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
- ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
-
- // merge from existing metadata
- FunctionMetaData.Builder functionMetaDataBuilder = FunctionMetaData.newBuilder().mergeFrom(existingComponent)
- .setFunctionDetails(functionDetails);
-
- // update auth data if need
- if (worker().getWorkerConfig().isAuthenticationEnabled()) {
- if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
- // get existing auth data if it exists
- Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
- if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
- existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
- }
-
- try {
- Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
- .getRuntimeFactory()
- .getAuthProvider()
- .updateAuthData(
- tenant, namespace,
- componentName, existingFunctionAuthData,
- clientAuthenticationDataHttps);
-
- if (newFunctionAuthData.isPresent()) {
- functionMetaDataBuilder.setFunctionAuthSpec(
- Function.FunctionAuthenticationSpec.newBuilder()
- .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
- .build());
- } else {
- functionMetaDataBuilder.clearFunctionAuthSpec();
- }
- } catch (Exception e) {
- log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), componentName, e.getMessage()));
- }
- }
- }
-
- PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
- if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
- try {
- packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
- functionPkgUrl, fileDetail, componentPackageFile);
- } catch (Exception e) {
- log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, componentName, e);
- throw new RestException(Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
- } else {
- packageLocationMetaDataBuilder = PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
- }
-
- functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
-
- updateRequest(functionMetaDataBuilder.build());
- } finally {
- if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
- && componentPackageFile != null && componentPackageFile.exists()) {
- componentPackageFile.delete();
- }
- }
- }
-
public void deregisterFunction(final String tenant,
final String namespace,
final String componentName,
@@ -1269,7 +870,7 @@ public abstract class ComponentImpl {
return retVals;
}
- private void updateRequest(final FunctionMetaData functionMetaData) {
+ void updateRequest(final FunctionMetaData functionMetaData) {
// Submit to FMT
FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
@@ -1708,80 +1309,6 @@ public abstract class ComponentImpl {
return null;
}
-private FunctionDetails validateUpdateRequestParams(final String tenant,
- final String namespace,
- final String componentName,
- final String componentConfigJson,
- final FunctionDetails.ComponentType componentType,
- final File componentPackageFile) throws IOException {
- if (tenant == null) {
- throw new IllegalArgumentException("Tenant is not provided");
- }
- if (namespace == null) {
- throw new IllegalArgumentException("Namespace is not provided");
- }
- if (componentName == null) {
- throw new IllegalArgumentException(String.format("%s Name is not provided", ComponentTypeUtils.toString(componentType)));
- }
-
- if (componentType.equals(FunctionDetails.ComponentType.FUNCTION) && !isEmpty(componentConfigJson)) {
- FunctionConfig functionConfig = new Gson().fromJson(componentConfigJson, FunctionConfig.class);
- // The rest end points take precedence over whatever is there in functionconfig
- functionConfig.setTenant(tenant);
- functionConfig.setNamespace(namespace);
- functionConfig.setName(componentName);
- FunctionConfigUtils.inferMissingArguments(functionConfig);
- ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
- return FunctionConfigUtils.convert(functionConfig, clsLoader);
- }
- if (componentType.equals(FunctionDetails.ComponentType.SOURCE)) {
- Path archivePath = null;
- SourceConfig sourceConfig = new Gson().fromJson(componentConfigJson, SourceConfig.class);
- // The rest end points take precedence over whatever is there in sourceconfig
- sourceConfig.setTenant(tenant);
- sourceConfig.setNamespace(namespace);
- sourceConfig.setName(componentName);
- org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
- if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
- String builtinArchive = sourceConfig.getArchive();
- if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
- builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
- }
- try {
- archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
- }
- }
- SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, componentPackageFile);
- return SourceConfigUtils.convert(sourceConfig, sourceDetails);
- }
- if (componentType.equals(FunctionDetails.ComponentType.SINK)) {
- Path archivePath = null;
- SinkConfig sinkConfig = new Gson().fromJson(componentConfigJson, SinkConfig.class);
- // The rest end points take precedence over whatever is there in sinkConfig
- sinkConfig.setTenant(tenant);
- sinkConfig.setNamespace(namespace);
- sinkConfig.setName(componentName);
- org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
- if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
- String builtinArchive = sinkConfig.getArchive();
- if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
- builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
- }
- try {
- archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
- } catch (Exception e) {
- throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
- }
- }
- SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
- return SinkConfigUtils.convert(sinkConfig, sinkDetails);
- } else {
- throw new IllegalArgumentException("Unrecognized component type: " + ComponentTypeUtils.toString(componentType));
- }
- }
-
private void validateTriggerRequestParams(final String tenant,
final String namespace,
final String functionName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index bf09fbf..da86189 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -18,27 +18,401 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.FunctionConfig;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.FunctionStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
+import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
+import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.utils.FunctionConfigUtils;
+import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
+import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
+
@Slf4j
public class FunctionsImpl extends ComponentImpl {
+ public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
+ }
+
+ public void registerFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final FunctionConfig functionConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (functionName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+ worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, functionName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, functionName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+ log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(functionPkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(functionPkgUrl)) {
+ throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ functionConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ functionConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, functionName, clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ functionPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateFunction(final String tenant,
+ final String namespace,
+ final String functionName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String functionPkgUrl,
+ final FunctionConfig functionConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (functionName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+ functionName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, functionName)) {
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, functionName);
+
+ if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, functionName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), functionName));
+ }
+
+ FunctionConfig existingFunctionConfig = FunctionConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in function config
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(functionName);
+ FunctionConfig mergedConfig;
+ try {
+ mergedConfig = FunctionConfigUtils.validateUpdate(existingFunctionConfig, functionConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ if (existingFunctionConfig.equals(mergedConfig) && isBlank(functionPkgUrl) && uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant, namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(functionPkgUrl)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(functionPkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), functionPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ functionName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), functionName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ if (isNotBlank(functionPkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ functionPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, functionName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(functionPkgUrl != null && functionPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetFunctionStatus extends GetStatus<FunctionStatus, FunctionStatus.FunctionInstanceStatus.FunctionInstanceStatusData> {
@Override
@@ -195,10 +569,6 @@ public class FunctionsImpl extends ComponentImpl {
return exceptionInformation;
}
- public FunctionsImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Function.FunctionDetails.ComponentType.FUNCTION);
- }
-
/**
* Get status of a function instance. If this worker is not running the function instance,
* @param tenant the tenant the function belongs to
@@ -262,4 +632,20 @@ public class FunctionsImpl extends ComponentImpl {
return functionStatus;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+ final String namespace,
+ final String componentName,
+ final FunctionConfig functionConfig,
+ final File componentPackageFile) throws IOException {
+
+ // The rest end points take precedence over whatever is there in function config
+ functionConfig.setTenant(tenant);
+ functionConfig.setNamespace(namespace);
+ functionConfig.setName(componentName);
+ FunctionConfigUtils.inferMissingArguments(functionConfig);
+ ClassLoader clsLoader = FunctionConfigUtils.validate(functionConfig, componentPackageFile);
+ return FunctionConfigUtils.convert(functionConfig, clsLoader);
+
+ }
}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
index b1da329..d4e6414 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImplV2.java
@@ -18,14 +18,12 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
-import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.gson.Gson;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.FunctionState;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.policies.data.FunctionStatus;
-import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -103,14 +101,9 @@ public class FunctionsImplV2 {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
- String functionConfigJson = null;
- try {
- functionConfigJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
- } catch (JsonProcessingException e) {
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
+
delegate.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientRole, null);
+ functionPkgUrl, functionConfig, clientRole, null);
return Response.ok().build();
}
@@ -125,15 +118,9 @@ public class FunctionsImplV2 {
throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
}
FunctionConfig functionConfig = FunctionConfigUtils.convertFromDetails(functionDetailsBuilder.build());
- String functionConfigJson = null;
- try {
- functionConfigJson = ObjectMapperFactory.getThreadLocal().writeValueAsString(functionConfig);
- } catch (JsonProcessingException e) {
- throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
- }
delegate.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientRole, null, null);
+ functionPkgUrl, functionConfig, clientRole, null, null);
return Response.ok().build();
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index fa07160..504722f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -18,34 +18,406 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SinkStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public class SinksImpl extends ComponentImpl {
+ public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
+ }
+
+ public void registerSink(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sinkPkgUrl,
+ final SinkConfig sinkConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (sinkName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+ sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+ worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sinkName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+ sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sinkName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
+ log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(sinkPkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(sinkPkgUrl)) {
+ throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ sinkConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ sinkConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, sinkName, clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sinkPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateSink(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sinkPkgUrl,
+ final SinkConfig sinkConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (sinkName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+ sinkName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, sinkName)) {
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+ Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, sinkName);
+
+ if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, sinkName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sinkName));
+ }
+
+
+ SinkConfig existingSinkConfig = SinkConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in functionconfig
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(sinkName);
+
+ SinkConfig mergedConfig;
+ try {
+ mergedConfig = SinkConfigUtils.validateUpdate(existingSinkConfig, sinkConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+
+ if (existingSinkConfig.equals(mergedConfig) && isBlank(sinkPkgUrl) && uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant, namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(sinkPkgUrl)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(sinkPkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sinkPkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ sinkName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sinkName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ if (isNotBlank(sinkPkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sinkPkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sinkName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(sinkPkgUrl != null && sinkPkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetSinkStatus extends GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
@Override
@@ -206,10 +578,6 @@ public class SinksImpl extends ComponentImpl {
return exceptionInformation;
}
- public SinksImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SINK);
- }
-
public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(final String tenant,
final String namespace,
final String sinkName,
@@ -287,4 +655,31 @@ public class SinksImpl extends ComponentImpl {
SinkConfig config = SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+ final String namespace,
+ final String sinkName,
+ final SinkConfig sinkConfig,
+ final File componentPackageFile) throws IOException {
+
+ Path archivePath = null;
+ // The rest end points take precedence over whatever is there in sinkConfig
+ sinkConfig.setTenant(tenant);
+ sinkConfig.setNamespace(namespace);
+ sinkConfig.setName(sinkName);
+ org.apache.pulsar.common.functions.Utils.inferMissingArguments(sinkConfig);
+ if (!StringUtils.isEmpty(sinkConfig.getArchive())) {
+ String builtinArchive = sinkConfig.getArchive();
+ if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+ }
+ try {
+ archivePath = this.worker().getConnectorsManager().getSinkArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
+ }
+ }
+ SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, componentPackageFile);
+ return SinkConfigUtils.convert(sinkConfig, sinkDetails);
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index d35724e..15a26aa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -18,33 +18,403 @@
*/
package org.apache.pulsar.functions.worker.rest.api;
+import com.google.protobuf.ByteString;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.authentication.AuthenticationDataHttps;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.client.admin.PulsarAdminException;
+import org.apache.pulsar.common.functions.UpdateOptions;
+import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.common.policies.data.SourceStatus;
+import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.instance.InstanceUtils;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.ComponentTypeUtils;
+import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.SourceConfigUtils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
+import org.apache.pulsar.functions.worker.WorkerUtils;
import org.apache.pulsar.functions.worker.rest.RestException;
+import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.Response;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
import java.net.URI;
+import java.nio.file.Path;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
+import java.util.Optional;
import java.util.function.Supplier;
+import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
+import static org.apache.pulsar.functions.worker.WorkerUtils.isFunctionCodeBuiltin;
import static org.apache.pulsar.functions.worker.rest.RestUtils.throwUnavailableException;
@Slf4j
public class SourcesImpl extends ComponentImpl {
+
+ public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
+ super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
+ }
+
+ public void registerSource(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sourcePkgUrl,
+ final SourceConfig sourceConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (sourceName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to register {}", tenant, namespace,
+ sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ try {
+ // Check tenant exists
+ worker().getBrokerAdmin().tenants().getTenantInfo(tenant);
+
+ String qualifiedNamespace = tenant + "/" + namespace;
+ List<String> namespaces = worker().getBrokerAdmin().namespaces().getNamespaces(tenant);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespace)) {
+ String qualifiedNamespaceWithCluster = String.format("%s/%s/%s", tenant,
+ worker().getWorkerConfig().getPulsarFunctionsCluster(), namespace);
+ if (namespaces != null && !namespaces.contains(qualifiedNamespaceWithCluster)) {
+ log.error("{}/{}/{} Namespace {} does not exist", tenant, namespace, sourceName, namespace);
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace does not exist");
+ }
+ }
+ } catch (PulsarAdminException.NotAuthorizedException e) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to operate {} on tenant", tenant, namespace,
+ sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+ } catch (PulsarAdminException.NotFoundException e) {
+ log.error("{}/{}/{} Tenant {} does not exist", tenant, namespace, sourceName, tenant);
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant does not exist");
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Issues getting tenant data", tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (functionMetaDataManager.containsFunction(tenant, namespace, sourceName)) {
+ log.error("{} {}/{}/{} already exists", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s already exists", ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ boolean isPkgUrlProvided = isNotBlank(sourcePkgUrl);
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isPkgUrlProvided) {
+
+ if (!Utils.isFunctionPackageUrlSupported(sourcePkgUrl)) {
+ throw new IllegalArgumentException("Function Package url is not valid. supported url (http/https/file)");
+ }
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ sourceConfig, componentPackageFile);
+ } else {
+ if (uploadedInputStream != null) {
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ sourceConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ }
+ } catch (Exception e) {
+ log.error("Invalid register {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("{} {}/{}/{} cannot be admitted by the runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+
+ // function state
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder()
+ .setFunctionDetails(functionDetails)
+ .setCreateTime(System.currentTimeMillis())
+ .setVersion(0);
+
+ // cache auth if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+
+ if (clientAuthenticationDataHttps != null) {
+ try {
+ Optional<FunctionAuthData> functionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .cacheAuthData(tenant, namespace, sourceName, clientAuthenticationDataHttps);
+
+ if (functionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(functionAuthData.get().getData()))
+ .build());
+ }
+ } catch (Exception e) {
+ log.error("Error caching authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+
+
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sourcePkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+
+ if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
+ public void updateSource(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final InputStream uploadedInputStream,
+ final FormDataContentDisposition fileDetail,
+ final String sourcePkgUrl,
+ final SourceConfig sourceConfig,
+ final String clientRole,
+ AuthenticationDataHttps clientAuthenticationDataHttps,
+ UpdateOptions updateOptions) {
+
+ if (!isWorkerServiceAvailable()) {
+ throwUnavailableException();
+ }
+
+ if (tenant == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Tenant is not provided");
+ }
+ if (namespace == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, "Namespace is not provided");
+ }
+ if (sourceName == null) {
+ throw new RestException(Response.Status.BAD_REQUEST, ComponentTypeUtils.toString(componentType) + " Name is not provided");
+ }
+
+ try {
+ if (!isAuthorizedRole(tenant, namespace, clientRole, clientAuthenticationDataHttps)) {
+ log.error("{}/{}/{} Client [{}] is not admin and authorized to update {}", tenant, namespace,
+ sourceName, clientRole, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.UNAUTHORIZED, "client is not authorize to perform operation");
+
+ }
+ } catch (PulsarAdminException e) {
+ log.error("{}/{}/{} Failed to authorize [{}]", tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+
+ FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
+
+ if (!functionMetaDataManager.containsFunction(tenant, namespace, sourceName)) {
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ Function.FunctionMetaData existingComponent = functionMetaDataManager.getFunctionMetaData(tenant, namespace, sourceName);
+
+ if (!InstanceUtils.calculateSubjectType(existingComponent.getFunctionDetails()).equals(componentType)) {
+ log.error("{}/{}/{} is not a {}", tenant, namespace, sourceName, ComponentTypeUtils.toString(componentType));
+ throw new RestException(Response.Status.NOT_FOUND, String.format("%s %s doesn't exist", ComponentTypeUtils.toString(componentType), sourceName));
+ }
+
+ SourceConfig existingSourceConfig = SourceConfigUtils.convertFromDetails(existingComponent.getFunctionDetails());
+ // The rest end points take precedence over whatever is there in functionconfig
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(sourceName);
+ SourceConfig mergedConfig;
+ try {
+ mergedConfig = SourceConfigUtils.validateUpdate(existingSourceConfig, sourceConfig);
+ } catch (Exception e) {
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ if (existingSourceConfig.equals(mergedConfig) && isBlank(sourcePkgUrl) && uploadedInputStream == null) {
+ log.error("{}/{}/{} Update contains no changes", tenant, namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST, "Update contains no change");
+ }
+
+ Function.FunctionDetails functionDetails = null;
+ File componentPackageFile = null;
+ try {
+
+ // validate parameters
+ try {
+ if (isNotBlank(sourcePkgUrl)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(sourcePkgUrl);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.FILE)
+ || existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.HTTP)) {
+ try {
+ componentPackageFile = FunctionCommon.extractFileFromPkgURL(existingComponent.getPackageLocation().getPackagePath());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("Encountered error \"%s\" when getting %s package from %s", e.getMessage(), ComponentTypeUtils.toString(componentType), sourcePkgUrl));
+ }
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ } else if (uploadedInputStream != null) {
+
+ componentPackageFile = WorkerUtils.dumpToTmpFile(uploadedInputStream);
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+
+ } else if (existingComponent.getPackageLocation().getPackagePath().startsWith(Utils.BUILTIN)) {
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ if (!isFunctionCodeBuiltin(functionDetails) && (componentPackageFile == null || fileDetail == null)) {
+ throw new IllegalArgumentException(ComponentTypeUtils.toString(componentType) + " Package is not provided");
+ }
+ } else {
+
+ componentPackageFile = FunctionCommon.createPkgTempFile();
+ componentPackageFile.deleteOnExit();
+ log.info("componentPackageFile: {}", componentPackageFile);
+ WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
+
+ functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
+ mergedConfig, componentPackageFile);
+ }
+ } catch (Exception e) {
+ log.error("Invalid update {} request @ /{}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
+ }
+
+ try {
+ worker().getFunctionRuntimeManager().getRuntimeFactory().doAdmissionChecks(functionDetails);
+ } catch (Exception e) {
+ log.error("Updated {} {}/{}/{} cannot be submitted to runtime factory", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName);
+ throw new RestException(Response.Status.BAD_REQUEST, String.format("%s %s cannot be admitted:- %s",
+ ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+
+ // merge from existing metadata
+ Function.FunctionMetaData.Builder functionMetaDataBuilder = Function.FunctionMetaData.newBuilder().mergeFrom(existingComponent)
+ .setFunctionDetails(functionDetails);
+
+ // update auth data if need
+ if (worker().getWorkerConfig().isAuthenticationEnabled()) {
+ if (clientAuthenticationDataHttps != null && updateOptions != null && updateOptions.isUpdateAuthData()) {
+ // get existing auth data if it exists
+ Optional<FunctionAuthData> existingFunctionAuthData = Optional.empty();
+ if (functionMetaDataBuilder.hasFunctionAuthSpec()) {
+ existingFunctionAuthData = Optional.ofNullable(getFunctionAuthData(Optional.ofNullable(functionMetaDataBuilder.getFunctionAuthSpec())));
+ }
+
+ try {
+ Optional<FunctionAuthData> newFunctionAuthData = worker().getFunctionRuntimeManager()
+ .getRuntimeFactory()
+ .getAuthProvider()
+ .updateAuthData(
+ tenant, namespace,
+ sourceName, existingFunctionAuthData,
+ clientAuthenticationDataHttps);
+
+ if (newFunctionAuthData.isPresent()) {
+ functionMetaDataBuilder.setFunctionAuthSpec(
+ Function.FunctionAuthenticationSpec.newBuilder()
+ .setData(ByteString.copyFrom(newFunctionAuthData.get().getData()))
+ .build());
+ } else {
+ functionMetaDataBuilder.clearFunctionAuthSpec();
+ }
+ } catch (Exception e) {
+ log.error("Error updating authentication data for {} {}/{}/{}", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, String.format("Error caching authentication data for %s %s:- %s", ComponentTypeUtils.toString(componentType), sourceName, e.getMessage()));
+ }
+ }
+ }
+
+ Function.PackageLocationMetaData.Builder packageLocationMetaDataBuilder;
+ if (isNotBlank(sourcePkgUrl) || uploadedInputStream != null) {
+ try {
+ packageLocationMetaDataBuilder = getFunctionPackageLocation(functionMetaDataBuilder.build(),
+ sourcePkgUrl, fileDetail, componentPackageFile);
+ } catch (Exception e) {
+ log.error("Failed process {} {}/{}/{} package: ", ComponentTypeUtils.toString(componentType), tenant, namespace, sourceName, e);
+ throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e.getMessage());
+ }
+ } else {
+ packageLocationMetaDataBuilder = Function.PackageLocationMetaData.newBuilder().mergeFrom(existingComponent.getPackageLocation());
+ }
+
+ functionMetaDataBuilder.setPackageLocation(packageLocationMetaDataBuilder);
+
+ updateRequest(functionMetaDataBuilder.build());
+ } finally {
+ if (!(sourcePkgUrl != null && sourcePkgUrl.startsWith(Utils.FILE))
+ && componentPackageFile != null && componentPackageFile.exists()) {
+ componentPackageFile.delete();
+ }
+ }
+ }
+
private class GetSourceStatus extends GetStatus<SourceStatus, SourceStatus.SourceInstanceStatus.SourceInstanceStatusData> {
@Override
@@ -208,10 +578,6 @@ public class SourcesImpl extends ComponentImpl {
}
}
- public SourcesImpl(Supplier<WorkerService> workerServiceSupplier) {
- super(workerServiceSupplier, Function.FunctionDetails.ComponentType.SOURCE);
- }
-
public SourceStatus getSourceStatus(final String tenant,
final String namespace,
final String componentName,
@@ -285,4 +651,31 @@ public class SourcesImpl extends ComponentImpl {
SourceConfig config = SourceConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
return config;
}
+
+ private Function.FunctionDetails validateUpdateRequestParams(final String tenant,
+ final String namespace,
+ final String sourceName,
+ final SourceConfig sourceConfig,
+ final File sourcePackageFile) throws IOException {
+
+ Path archivePath = null;
+ // The rest end points take precedence over whatever is there in sourceconfig
+ sourceConfig.setTenant(tenant);
+ sourceConfig.setNamespace(namespace);
+ sourceConfig.setName(sourceName);
+ org.apache.pulsar.common.functions.Utils.inferMissingArguments(sourceConfig);
+ if (!StringUtils.isEmpty(sourceConfig.getArchive())) {
+ String builtinArchive = sourceConfig.getArchive();
+ if (builtinArchive.startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN)) {
+ builtinArchive = builtinArchive.replaceFirst("^builtin://", "");
+ }
+ try {
+ archivePath = this.worker().getConnectorsManager().getSourceArchive(builtinArchive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
+ }
+ }
+ SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, sourcePackageFile);
+ return SourceConfigUtils.convert(sourceConfig, sourceDetails);
+ }
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
index 2f1a87f..0bbacaa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionsApiV3Resource.java
@@ -67,10 +67,10 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("functionConfig") String functionConfigJson) {
+ final @FormDataParam("functionConfig") FunctionConfig functionConfig) {
functions.registerFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData());
+ functionPkgUrl, functionConfig, clientAppId(), clientAuthData());
}
@@ -83,11 +83,11 @@ public class FunctionsApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("functionConfig") String functionConfigJson,
+ final @FormDataParam("functionConfig") FunctionConfig functionConfig,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
functions.updateFunction(tenant, namespace, functionName, uploadedInputStream, fileDetail,
- functionPkgUrl, functionConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ functionPkgUrl, functionConfig, clientAppId(), clientAuthData(), updateOptions);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
index e699544..d9f788b 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SinksApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SinksImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SinksApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sinkConfig") String sinkConfigJson) {
+ final @FormDataParam("sinkConfig") SinkConfig sinkConfig) {
- sink.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData());
+ sink.registerSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ functionPkgUrl, sinkConfig, clientAppId(), clientAuthData());
}
@PUT
@@ -77,11 +84,11 @@ public class SinksApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sinkConfig") String sinkConfigJson,
+ final @FormDataParam("sinkConfig") SinkConfig sinkConfig,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
- sink.updateFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- functionPkgUrl, sinkConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ sink.updateSink(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
+ functionPkgUrl, sinkConfig, clientAppId(), clientAuthData(), updateOptions);
}
@DELETE
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
index de6df1a..9be1581 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v3/SourcesApiV3Resource.java
@@ -33,7 +33,14 @@ import org.apache.pulsar.functions.worker.rest.api.SourcesImpl;
import org.glassfish.jersey.media.multipart.FormDataContentDisposition;
import org.glassfish.jersey.media.multipart.FormDataParam;
-import javax.ws.rs.*;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
import javax.ws.rs.core.MediaType;
import java.io.IOException;
import java.io.InputStream;
@@ -62,10 +69,10 @@ public class SourcesApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sourceConfig") String sourceConfigJson) {
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig) {
- source.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData());
+ source.registerSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ functionPkgUrl, sourceConfig, clientAppId(), clientAuthData());
}
@@ -78,11 +85,11 @@ public class SourcesApiV3Resource extends FunctionApiResource {
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
final @FormDataParam("url") String functionPkgUrl,
- final @FormDataParam("sourceConfig") String sourceConfigJson,
+ final @FormDataParam("sourceConfig") SourceConfig sourceConfig,
final @FormDataParam("updateOptions") UpdateOptions updateOptions) {
- source.updateFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- functionPkgUrl, sourceConfigJson, clientAppId(), clientAuthData(), updateOptions);
+ source.updateSource(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
+ functionPkgUrl, sourceConfig, clientAppId(), clientAuthData(), updateOptions);
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
index 434cfc2..b3b673b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/FunctionApiV3ResourceTest.java
@@ -484,7 +484,7 @@ public class FunctionApiV3ResourceTest {
inputStream,
details,
functionPkgUrl,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null);
}
@@ -498,7 +498,7 @@ public class FunctionApiV3ResourceTest {
mockedInputStream,
mockedFormData,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null);
}
@@ -912,7 +912,7 @@ public class FunctionApiV3ResourceTest {
inputStream,
details,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -936,7 +936,7 @@ public class FunctionApiV3ResourceTest {
mockedInputStream,
mockedFormData,
null,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -1024,7 +1024,7 @@ public class FunctionApiV3ResourceTest {
null,
null,
filePackageUrl,
- new Gson().toJson(functionConfig),
+ functionConfig,
null, null, null);
}
@@ -1469,7 +1469,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
- resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+ resource.registerFunction(tenant, namespace, function, null, null, filePackageUrl, functionConfig, null, null);
}
@@ -1500,7 +1500,7 @@ public class FunctionApiV3ResourceTest {
functionConfig.setCustomSerdeInputs(topicsToSerDeClassName);
functionConfig.setOutput(outputTopic);
functionConfig.setOutputSerdeClassName(outputSerdeClassName);
- resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, new Gson().toJson(functionConfig), null, null);
+ resource.registerFunction(actualTenant, actualNamespace, actualName, null, null, filePackageUrl, functionConfig, null, null);
}
public static FunctionConfig createDefaultFunctionConfig() {
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 3b3548a..c884835 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -434,28 +434,28 @@ public class SinkApiV3ResourceTest {
sinkConfig.setParallelism(parallelism);
}
- resource.registerFunction(
+ resource.registerSink(
tenant,
namespace,
sink,
inputStream,
details,
pkgUrl,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
private void registerDefaultSink() throws IOException {
SinkConfig sinkConfig = createDefaultSinkConfig();
- resource.registerFunction(
+ resource.registerSink(
tenant,
namespace,
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
@@ -548,14 +548,14 @@ public class SinkApiV3ResourceTest {
sinkConfig.setClassName(className);
sinkConfig.setParallelism(parallelism);
sinkConfig.setTopicToSerdeClassName(topicsToSerDeClassName);
- resource.registerFunction(
+ resource.registerSink(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null);
}
@@ -829,14 +829,14 @@ public class SinkApiV3ResourceTest {
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
}
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
inputStream,
details,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -870,14 +870,14 @@ public class SinkApiV3ResourceTest {
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -972,14 +972,14 @@ public class SinkApiV3ResourceTest {
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
- resource.updateFunction(
+ resource.updateSink(
tenant,
namespace,
sink,
null,
null,
filePackageUrl,
- new Gson().toJson(sinkConfig),
+ sinkConfig,
null, null, null);
}
@@ -1364,7 +1364,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Namespace does not exist")
- public void testRegisterFunctionNonExistingNamespace() throws Exception {
+ public void testregisterSinkNonExistingNamespace() throws Exception {
try {
this.namespaceList.clear();
registerDefaultSink();
@@ -1375,7 +1375,7 @@ public class SinkApiV3ResourceTest {
}
@Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Tenant does not exist")
- public void testRegisterFunctionNonExistingTenant() throws Exception {
+ public void testregisterSinkNonExistingTenant() throws Exception {
try {
when(mockedTenants.getTenantInfo(any())).thenThrow(PulsarAdminException.NotFoundException.class);
registerDefaultSink();
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index cd55369..875e2ef 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -404,28 +404,28 @@ public class SourceApiV3ResourceTest {
sourceConfig.setParallelism(parallelism);
}
- resource.registerFunction(
+ resource.registerSource(
tenant,
namespace,
function,
inputStream,
details,
pkgUrl,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
private void registerDefaultSource() throws IOException {
SourceConfig sourceConfig = createDefaultSourceConfig();
- resource.registerFunction(
+ resource.registerSource(
tenant,
namespace,
source,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
@@ -518,14 +518,14 @@ public class SourceApiV3ResourceTest {
sourceConfig.setParallelism(parallelism);
sourceConfig.setTopicName(outputTopic);
sourceConfig.setSerdeClassName(outputSerdeClassName);
- resource.registerFunction(
+ resource.registerSource(
actualTenant,
actualNamespace,
actualName,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null);
}
@@ -850,14 +850,14 @@ public class SourceApiV3ResourceTest {
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
}
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
function,
inputStream,
details,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}
@@ -887,14 +887,14 @@ public class SourceApiV3ResourceTest {
this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
source,
new FileInputStream(JAR_FILE_PATH),
mockedFormData,
null,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}
@@ -988,14 +988,14 @@ public class SourceApiV3ResourceTest {
CompletableFuture<RequestResult> requestResult = CompletableFuture.completedFuture(rr);
when(mockedManager.updateFunction(any(FunctionMetaData.class))).thenReturn(requestResult);
- resource.updateFunction(
+ resource.updateSource(
tenant,
namespace,
source,
null,
null,
filePackageUrl,
- new Gson().toJson(sourceConfig),
+ sourceConfig,
null, null, null);
}