You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/08 22:03:11 UTC
[pulsar] 05/13: Plugged in source/sink
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit f8577658f940a582ee59d4e2467f1465a2655de1
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Fri Oct 5 16:23:50 2018 -0700
Plugged in source/sink
---
.../pulsar/functions/utils/SinkConfigUtils.java | 9 +--
.../pulsar/functions/utils/SourceConfigUtils.java | 12 +--
.../pulsar/functions/utils/io/ConnectorUtils.java | 84 ++++++++++-----------
.../functions/utils/validation/ValidatorImpls.java | 85 +++++++++++-----------
.../org/apache/pulsar/functions/worker/Utils.java | 28 +++++++
.../functions/worker/rest/api/FunctionsImpl.java | 60 +++++++++++++--
.../worker/rest/api/v2/SinkApiV2Resource.java | 4 +-
.../worker/rest/api/v2/SourceApiV2Resource.java | 14 ++--
8 files changed, 179 insertions(+), 117 deletions(-)
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d44ea30..95803ab 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@ import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee
public class SinkConfigUtils {
- public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException {
+ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader classLoader) throws IOException {
String sinkClassName = null;
String typeArg = null;
@@ -53,11 +53,8 @@ public class SinkConfigUtils {
}
sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
} else {
- sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
- Collections.emptySet())) {
- typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
- }
+ sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+ typeArg = Utils.getSinkType(sinkClassName, classLoader).getName();
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 73b331a..a132c8a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,16 +26,14 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.utils.io.ConnectorUtils;
-import java.io.File;
import java.io.IOException;
-import java.util.Collections;
import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
import static org.apache.pulsar.functions.utils.Utils.getSourceType;
public class SourceConfigUtils {
- public static FunctionDetails convert(SourceConfig sourceConfig)
+ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader classLoader)
throws IllegalArgumentException, IOException {
String sourceClassName = null;
@@ -52,12 +50,8 @@ public class SourceConfigUtils {
}
sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
} else {
- sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
-
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
- Collections.emptySet())) {
- typeArg = getSourceType(sourceClassName, ncl).getName();
- }
+ sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
+ typeArg = getSourceType(sourceClassName, classLoader).getName();
}
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 915df05..c6feb5a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,59 +46,57 @@ public class ConnectorUtils {
/**
* Extract the Pulsar IO Source class from a connector archive.
*/
- public static String getIOSourceClass(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
- ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
- ConnectorDefinition.class);
- if (StringUtils.isEmpty(conf.getSourceClass())) {
- throw new IOException(
- String.format("The '%s' connector does not provide a source implementation", conf.getName()));
- }
+ public static String getIOSourceClass(ClassLoader classLoader) throws IOException {
+ NarClassLoader ncl = (NarClassLoader) classLoader;
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+ ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+ ConnectorDefinition.class);
+ if (StringUtils.isEmpty(conf.getSourceClass())) {
+ throw new IOException(
+ String.format("The '%s' connector does not provide a source implementation", conf.getName()));
+ }
- try {
- // Try to load source class and check it implements Source interface
- Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
- if (!(instance instanceof Source)) {
- throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
- + Source.class.getName());
- }
- } catch (Throwable t) {
- Exceptions.rethrowIOException(t);
+ try {
+ // Try to load source class and check it implements Source interface
+ Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
+ if (!(instance instanceof Source)) {
+ throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
+ + Source.class.getName());
}
-
- return conf.getSourceClass();
+ } catch (Throwable t) {
+ Exceptions.rethrowIOException(t);
}
+
+ return conf.getSourceClass();
}
/**
* Extract the Pulsar IO Sink class from a connector archive.
*/
- public static String getIOSinkClass(String narPath) throws IOException {
- try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
- String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+ public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
+ NarClassLoader ncl = (NarClassLoader) classLoader;
+ String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+ ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+ ConnectorDefinition.class);
+ if (StringUtils.isEmpty(conf.getSinkClass())) {
+ throw new IOException(
+ String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
+ }
- ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
- ConnectorDefinition.class);
- if (StringUtils.isEmpty(conf.getSinkClass())) {
+ try {
+ // Try to load source class and check it implements Sink interface
+ Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
+ if (!(instance instanceof Sink)) {
throw new IOException(
- String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
- }
-
- try {
- // Try to load source class and check it implements Sink interface
- Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
- if (!(instance instanceof Sink)) {
- throw new IOException(
- "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
- }
- } catch (Throwable t) {
- Exceptions.rethrowIOException(t);
+ "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
}
-
- return conf.getSinkClass();
+ } catch (Throwable t) {
+ Exceptions.rethrowIOException(t);
}
+
+ return conf.getSinkClass();
}
public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException {
@@ -127,14 +125,10 @@ public class ConnectorUtils {
log.info("Found connector {} from {}", cntDef, archive);
if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
- // Validate source class to be present and of the right type
- ConnectorUtils.getIOSourceClass(archive.toString());
connectors.sources.put(cntDef.getName(), archive);
}
if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
- // Validate sinkclass to be present and of the right type
- ConnectorUtils.getIOSinkClass(archive.toString());
connectors.sinks.put(cntDef.getName(), archive);
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index dba6bd9..e600afe 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -727,14 +727,15 @@ public class ValidatorImpls {
@Override
public void validateField(String name, Object o, ClassLoader classLoader) {
SourceConfig sourceConfig = (SourceConfig) o;
- if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
- // We don't have to check the archive, since it's provided on the worker itself
+ if (classLoader == null) {
+ // This happens at the cli for builtin. There is no need to check this since
+ // the actual check will be done at serverside
return;
}
String sourceClassName;
try {
- sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+ sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
} catch (IOException e1) {
throw new IllegalArgumentException("Failed to extract source class from archive", e1);
}
@@ -743,15 +744,14 @@ public class ValidatorImpls {
Class<?> typeArg = getSourceType(sourceClassName, classLoader);
// Only one of serdeClassName or schemaType should be set
- if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()
- && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+ if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
}
- if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
+ if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false);
}
- if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+ if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false);
}
}
@@ -761,8 +761,9 @@ public class ValidatorImpls {
@Override
public void validateField(String name, Object o, ClassLoader classLoader) {
SinkConfig sinkConfig = (SinkConfig) o;
- if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
- // We don't have to check the archive, since it's provided on the worker itself
+ if (classLoader == null) {
+ // This happens at the cli for builtin. There is no need to check this since
+ // the actual check will be done at serverside
return;
}
@@ -779,42 +780,42 @@ public class ValidatorImpls {
}
- try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
- Collections.emptySet())) {
- String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
- Class<?> typeArg = getSinkType(sinkClassName, clsLoader);
+ String sinkClassName;
+ try {
+ sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+ } catch (IOException e1) {
+ throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+ }
+ Class<?> typeArg = getSinkType(sinkClassName, classLoader);
- if (sinkConfig.getTopicToSerdeClassName() != null) {
- sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
- FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true);
- });
- }
+ if (sinkConfig.getTopicToSerdeClassName() != null) {
+ sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
+ FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, classLoader, true);
+ });
+ }
- if (sinkConfig.getTopicToSchemaType() != null) {
- sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
- FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true);
- });
- }
+ if (sinkConfig.getTopicToSchemaType() != null) {
+ sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+ FunctionConfigValidator.validateSchema(schemaType, typeArg, name, classLoader, true);
+ });
+ }
- // topicsPattern does not need checks
-
- if (sinkConfig.getInputSpecs() != null) {
- sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
- // Only one is set
- if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
- && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
- throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
- }
- if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
- FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true);
- }
- if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
- FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true);
- }
- });
- }
- } catch (IOException e) {
- throw new IllegalArgumentException(e.getMessage());
+ // topicsPattern does not need checks
+
+ if (sinkConfig.getInputSpecs() != null) {
+ sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+ // Only one is set
+ if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
+ && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+ throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+ }
+ if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
+ FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, classLoader, true);
+ }
+ if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+ FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, classLoader, true);
+ }
+ });
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index aa80403..f935991 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -33,6 +33,7 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
import java.util.UUID;
import lombok.extern.slf4j.Slf4j;
@@ -47,6 +48,7 @@ import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.dlog.DLInputStream;
import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
@@ -163,6 +165,32 @@ public final class Utils {
throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
}
}
+
+ public static NarClassLoader extractNarClassloader(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
+ if (destPkgUrl.startsWith(FILE)) {
+ URL url = new URL(destPkgUrl);
+ File file = new File(url.toURI());
+ if (!file.exists()) {
+ throw new IllegalArgumentException(destPkgUrl + " does not exists locally");
+ }
+ try {
+ return NarClassLoader.getFromArchive(file, Collections.emptySet());
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(
+ "Corrupt User PackageFile " + file + " with error " + e.getMessage());
+ }
+ } else if (destPkgUrl.startsWith("http")) {
+ URL website = new URL(destPkgUrl);
+ File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
+ if (!tempFile.exists()) {
+ throw new IllegalArgumentException("Could not create local file " + tempFile);
+ }
+ tempFile.deleteOnExit();
+ return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
+ } else {
+ throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
+ }
+ }
public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
URL website = new URL(destPkgUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0d30e37..08cd7de 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -39,10 +39,8 @@ import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Files;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.nio.file.Path;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
@@ -75,6 +73,7 @@ import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.Codec;
@@ -1007,13 +1006,13 @@ public class FunctionsImpl {
}
if (!StringUtils.isEmpty(sourceConfigJson)) {
SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
- ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+ NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
return SourceConfigUtils.convert(sourceConfig, clsLoader);
}
if (!StringUtils.isEmpty(sinkConfigJson)) {
SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
- ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+ NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
return SinkConfigUtils.convert(sinkConfig, clsLoader);
}
@@ -1076,6 +1075,55 @@ public class FunctionsImpl {
}
}
+ private NarClassLoader extractNarClassLoader(String archive, String pkgUrl, File uploadedInputStreamFileName,
+ boolean isSource) {
+ if (!StringUtils.isEmpty(archive)) {
+ if (isSource) {
+ Path path;
+ try {
+ path = this.worker().getConnectorsManager().getSourceArchive(archive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Source archive %s found", archive));
+ }
+ try {
+ return NarClassLoader.getFromArchive(path.toFile(),
+ Collections.emptySet());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("The source %s is corrupted", archive));
+ }
+ } else {
+ Path path;
+ try {
+ path = this.worker().getConnectorsManager().getSinkArchive(archive);
+ } catch (Exception e) {
+ throw new IllegalArgumentException(String.format("No Sink archive %s found", archive));
+ }
+ try {
+ return NarClassLoader.getFromArchive(path.toFile(),
+ Collections.emptySet());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(String.format("The sink %s is corrupted", archive));
+ }
+ }
+ }
+ if (!StringUtils.isEmpty(pkgUrl)) {
+ try {
+ return Utils.extractNarClassloader(pkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ if (uploadedInputStreamFileName != null) {
+ try {
+ return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
+ Collections.emptySet());
+ } catch (IOException e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+ return null;
+ }
+
private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) {
// validate only if classLoader is provided
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 175e2eb..e36b069 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -48,11 +48,11 @@ public class SinkApiV2Resource extends FunctionApiResource {
final @PathParam("sinkName") String sinkName,
final @FormDataParam("data") InputStream uploadedInputStream,
final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String sourcePkgUrl,
+ final @FormDataParam("url") String functionPkgUrl,
final @FormDataParam("sinkConfig") String sinkConfigJson) {
return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
- sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId());
+ functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 9d25bf9..cfb6baa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -44,15 +44,15 @@ public class SourceApiV2Resource extends FunctionApiResource {
@Path("/{tenant}/{namespace}/{sourceName}")
@Consumes(MediaType.MULTIPART_FORM_DATA)
public Response registerSource(final @PathParam("tenant") String tenant,
- final @PathParam("namespace") String namespace,
- final @PathParam("sourceName") String sourceName,
- final @FormDataParam("data") InputStream uploadedInputStream,
- final @FormDataParam("data") FormDataContentDisposition fileDetail,
- final @FormDataParam("url") String sourcePkgUrl,
- final @FormDataParam("sourceConfig") String sourceConfigJson) {
+ final @PathParam("namespace") String namespace,
+ final @PathParam("sourceName") String sourceName,
+ final @FormDataParam("data") InputStream uploadedInputStream,
+ final @FormDataParam("data") FormDataContentDisposition fileDetail,
+ final @FormDataParam("url") String functionPkgUrl,
+ final @FormDataParam("sourceConfig") String sourceConfigJson) {
return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
- sourcePkgUrl, null, null, sourceConfigJson, null, clientAppId());
+ functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
}