You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/08 22:03:11 UTC

[pulsar] 05/13: Plugged in source/sink

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch srkukarni/serverside_validation_endpoints
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f8577658f940a582ee59d4e2467f1465a2655de1
Author: Sanjeev Kulkarni <sa...@streaml.io>
AuthorDate: Fri Oct 5 16:23:50 2018 -0700

    Plugged in source/sink
---
 .../pulsar/functions/utils/SinkConfigUtils.java    |  9 +--
 .../pulsar/functions/utils/SourceConfigUtils.java  | 12 +--
 .../pulsar/functions/utils/io/ConnectorUtils.java  | 84 ++++++++++-----------
 .../functions/utils/validation/ValidatorImpls.java | 85 +++++++++++-----------
 .../org/apache/pulsar/functions/worker/Utils.java  | 28 +++++++
 .../functions/worker/rest/api/FunctionsImpl.java   | 60 +++++++++++++--
 .../worker/rest/api/v2/SinkApiV2Resource.java      |  4 +-
 .../worker/rest/api/v2/SourceApiV2Resource.java    | 14 ++--
 8 files changed, 179 insertions(+), 117 deletions(-)

diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d44ea30..95803ab 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -37,7 +37,7 @@ import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee
 
 public class SinkConfigUtils {
 
-    public static FunctionDetails convert(SinkConfig sinkConfig) throws IOException {
+    public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader classLoader) throws IOException {
 
         String sinkClassName = null;
         String typeArg = null;
@@ -53,11 +53,8 @@ public class SinkConfigUtils {
                 }
                 sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
             } else {
-                sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = Utils.getSinkType(sinkClassName, ncl).getName();
-                }
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+                typeArg = Utils.getSinkType(sinkClassName, classLoader).getName();
             }
         }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 73b331a..a132c8a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -26,16 +26,14 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.utils.io.ConnectorUtils;
 
-import java.io.File;
 import java.io.IOException;
-import java.util.Collections;
 
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
 public class SourceConfigUtils {
 
-    public static FunctionDetails convert(SourceConfig sourceConfig)
+    public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader classLoader)
             throws IllegalArgumentException, IOException {
 
         String sourceClassName = null;
@@ -52,12 +50,8 @@ public class SourceConfigUtils {
                 }
                 sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
             } else {
-                sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
-
-                try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(sourceConfig.getArchive()),
-                        Collections.emptySet())) {
-                    typeArg = getSourceType(sourceClassName, ncl).getName();
-                }
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
+                typeArg = getSourceType(sourceClassName, classLoader).getName();
             }
         }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
index 915df05..c6feb5a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/io/ConnectorUtils.java
@@ -46,59 +46,57 @@ public class ConnectorUtils {
     /**
      * Extract the Pulsar IO Source class from a connector archive.
      */
-    public static String getIOSourceClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
-
-            ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSourceClass())) {
-                throw new IOException(
-                        String.format("The '%s' connector does not provide a source implementation", conf.getName()));
-            }
+    public static String getIOSourceClass(ClassLoader classLoader) throws IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSourceClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a source implementation", conf.getName()));
+        }
 
-            try {
-                // Try to load source class and check it implements Source interface
-                Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
-                if (!(instance instanceof Source)) {
-                    throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
-                            + Source.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+        try {
+            // Try to load source class and check it implements Source interface
+            Object instance = ncl.loadClass(conf.getSourceClass()).newInstance();
+            if (!(instance instanceof Source)) {
+                throw new IOException("Class " + conf.getSourceClass() + " does not implement interface "
+                        + Source.class.getName());
             }
-
-            return conf.getSourceClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSourceClass();
     }
 
     /**
      * Extract the Pulsar IO Sink class from a connector archive.
      */
-    public static String getIOSinkClass(String narPath) throws IOException {
-        try (NarClassLoader ncl = NarClassLoader.getFromArchive(new File(narPath), Collections.emptySet())) {
-            String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+    public static String getIOSinkClass(ClassLoader classLoader) throws IOException {
+        NarClassLoader ncl = (NarClassLoader) classLoader;
+        String configStr = ncl.getServiceDefinition(PULSAR_IO_SERVICE_NAME);
+
+        ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
+                ConnectorDefinition.class);
+        if (StringUtils.isEmpty(conf.getSinkClass())) {
+            throw new IOException(
+                    String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
+        }
 
-            ConnectorDefinition conf = ObjectMapperFactory.getThreadLocalYaml().readValue(configStr,
-                    ConnectorDefinition.class);
-            if (StringUtils.isEmpty(conf.getSinkClass())) {
+        try {
+            // Try to load source class and check it implements Sink interface
+            Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
+            if (!(instance instanceof Sink)) {
                 throw new IOException(
-                        String.format("The '%s' connector does not provide a sink implementation", conf.getName()));
-            }
-
-            try {
-                // Try to load source class and check it implements Sink interface
-                Object instance = ncl.loadClass(conf.getSinkClass()).newInstance();
-                if (!(instance instanceof Sink)) {
-                    throw new IOException(
-                            "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
-                }
-            } catch (Throwable t) {
-                Exceptions.rethrowIOException(t);
+                        "Class " + conf.getSinkClass() + " does not implement interface " + Sink.class.getName());
             }
-
-            return conf.getSinkClass();
+        } catch (Throwable t) {
+            Exceptions.rethrowIOException(t);
         }
+
+        return conf.getSinkClass();
     }
 
     public static ConnectorDefinition getConnectorDefinition(String narPath) throws IOException {
@@ -127,14 +125,10 @@ public class ConnectorUtils {
                     log.info("Found connector {} from {}", cntDef, archive);
 
                     if (!StringUtils.isEmpty(cntDef.getSourceClass())) {
-                        // Validate source class to be present and of the right type
-                        ConnectorUtils.getIOSourceClass(archive.toString());
                         connectors.sources.put(cntDef.getName(), archive);
                     }
 
                     if (!StringUtils.isEmpty(cntDef.getSinkClass())) {
-                        // Validate sinkclass to be present and of the right type
-                        ConnectorUtils.getIOSinkClass(archive.toString());
                         connectors.sinks.put(cntDef.getName(), archive);
                     }
 
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
index dba6bd9..e600afe 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/validation/ValidatorImpls.java
@@ -727,14 +727,15 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o, ClassLoader classLoader) {
             SourceConfig sourceConfig = (SourceConfig) o;
-            if (sourceConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to check this since
+                // the actual check will be done at serverside
                 return;
             }
 
             String sourceClassName;
             try {
-                sourceClassName = ConnectorUtils.getIOSourceClass(sourceConfig.getArchive());
+                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
             } catch (IOException e1) {
                 throw new IllegalArgumentException("Failed to extract source class from archive", e1);
             }
@@ -743,15 +744,14 @@ public class ValidatorImpls {
             Class<?> typeArg = getSourceType(sourceClassName, classLoader);
 
             // Only one of serdeClassName or schemaType should be set
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()
-                    && sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName()) && !StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
             }
 
-            if (sourceConfig.getSerdeClassName() != null && !sourceConfig.getSerdeClassName().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSerdeClassName())) {
                 FunctionConfigValidator.validateSerde(sourceConfig.getSerdeClassName(),typeArg, name, classLoader, false);
             }
-            if (sourceConfig.getSchemaType() != null && !sourceConfig.getSchemaType().isEmpty()) {
+            if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
                 FunctionConfigValidator.validateSchema(sourceConfig.getSchemaType(), typeArg, name, classLoader, false);
             }
         }
@@ -761,8 +761,9 @@ public class ValidatorImpls {
         @Override
         public void validateField(String name, Object o, ClassLoader classLoader) {
             SinkConfig sinkConfig = (SinkConfig) o;
-            if (sinkConfig.getArchive().startsWith(Utils.BUILTIN)) {
-                // We don't have to check the archive, since it's provided on the worker itself
+            if (classLoader == null) {
+                // This happens at the cli for builtin. There is no need to check this since
+                // the actual check will be done at serverside
                 return;
             }
 
@@ -779,42 +780,42 @@ public class ValidatorImpls {
             }
 
 
-            try (NarClassLoader clsLoader = NarClassLoader.getFromArchive(new File(sinkConfig.getArchive()),
-                    Collections.emptySet())) {
-                String sinkClassName = ConnectorUtils.getIOSinkClass(sinkConfig.getArchive());
-                Class<?> typeArg = getSinkType(sinkClassName, clsLoader);
+            String sinkClassName;
+            try {
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+            } catch (IOException e1) {
+                throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+            }
+            Class<?> typeArg = getSinkType(sinkClassName, classLoader);
 
-                if (sinkConfig.getTopicToSerdeClassName() != null) {
-                    sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
-                        FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSerdeClassName() != null) {
+                sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
+                    FunctionConfigValidator.validateSerde(serdeClassName, typeArg, name, classLoader, true);
+                });
+            }
 
-                if (sinkConfig.getTopicToSchemaType() != null) {
-                    sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
-                        FunctionConfigValidator.validateSchema(schemaType, typeArg, name, clsLoader, true);
-                    });
-                }
+            if (sinkConfig.getTopicToSchemaType() != null) {
+                sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+                    FunctionConfigValidator.validateSchema(schemaType, typeArg, name, classLoader, true);
+                });
+            }
 
-                // topicsPattern does not need checks
-
-                if (sinkConfig.getInputSpecs() != null) {
-                    sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
-                        // Only one is set
-                        if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
-                                && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                            throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
-                        }
-                        if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
-                            FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, clsLoader, true);
-                        }
-                        if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
-                            FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, clsLoader, true);
-                        }
-                    });
-                }
-            } catch (IOException e) {
-                throw new IllegalArgumentException(e.getMessage());
+            // topicsPattern does not need checks
+
+            if (sinkConfig.getInputSpecs() != null) {
+                sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+                    // Only one is set
+                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()
+                            && consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+                        throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
+                    }
+                    if (consumerSpec.getSerdeClassName() != null && !consumerSpec.getSerdeClassName().isEmpty()) {
+                        FunctionConfigValidator.validateSerde(consumerSpec.getSerdeClassName(), typeArg, name, classLoader, true);
+                    }
+                    if (consumerSpec.getSchemaType() != null && !consumerSpec.getSchemaType().isEmpty()) {
+                        FunctionConfigValidator.validateSchema(consumerSpec.getSchemaType(), typeArg, name, classLoader, true);
+                    }
+                });
             }
         }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
index aa80403..f935991 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/Utils.java
@@ -33,6 +33,7 @@ import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
+import java.util.Collections;
 import java.util.UUID;
 import lombok.extern.slf4j.Slf4j;
 
@@ -47,6 +48,7 @@ import org.apache.distributedlog.metadata.DLMetadata;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminBuilder;
 import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.worker.dlog.DLInputStream;
 import org.apache.pulsar.functions.worker.dlog.DLOutputStream;
@@ -163,6 +165,32 @@ public final class Utils {
             throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
         }
     }
+
+    public static NarClassLoader extractNarClassloader(String destPkgUrl, String downloadPkgDir) throws IOException, URISyntaxException {
+        if (destPkgUrl.startsWith(FILE)) {
+            URL url = new URL(destPkgUrl);
+            File file = new File(url.toURI());
+            if (!file.exists()) {
+                throw new IllegalArgumentException(destPkgUrl + " does not exists locally");
+            }
+            try {
+                return NarClassLoader.getFromArchive(file, Collections.emptySet());
+            } catch (MalformedURLException e) {
+                throw new IllegalArgumentException(
+                        "Corrupt User PackageFile " + file + " with error " + e.getMessage());
+            }
+        } else if (destPkgUrl.startsWith("http")) {
+            URL website = new URL(destPkgUrl);
+            File tempFile = new File(downloadPkgDir, website.getHost() + UUID.randomUUID().toString());
+            if (!tempFile.exists()) {
+                throw new IllegalArgumentException("Could not create local file " + tempFile);
+            }
+            tempFile.deleteOnExit();
+            return NarClassLoader.getFromArchive(tempFile, Collections.emptySet());
+        } else {
+            throw new IllegalArgumentException("Unsupported url protocol "+ destPkgUrl +", supported url protocols: [file/http/https]");
+        }
+    }
     
     public static void downloadFromHttpUrl(String destPkgUrl, FileOutputStream outputStream) throws IOException {
         URL website = new URL(destPkgUrl);
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index 0d30e37..08cd7de 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -39,10 +39,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
 import java.nio.file.Files;
-import java.util.Base64;
-import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.nio.file.Path;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -75,6 +73,7 @@ import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.Reader;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.io.ConnectorDefinition;
+import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.ErrorData;
 import org.apache.pulsar.common.policies.data.TenantInfo;
 import org.apache.pulsar.common.util.Codec;
@@ -1007,13 +1006,13 @@ public class FunctionsImpl {
         }
         if (!StringUtils.isEmpty(sourceConfigJson)) {
             SourceConfig sourceConfig = new Gson().fromJson(sourceConfigJson, SourceConfig.class);
-            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+            NarClassLoader clsLoader = extractNarClassLoader(sourceConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, true);
             ConfigValidation.validateConfig(sourceConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SourceConfigUtils.convert(sourceConfig, clsLoader);
         }
         if (!StringUtils.isEmpty(sinkConfigJson)) {
             SinkConfig sinkConfig = new Gson().fromJson(sinkConfigJson, SinkConfig.class);
-            ClassLoader clsLoader = extractClassLoader(functionPkgUrl, uploadedInputStreamAsFile);
+            NarClassLoader clsLoader = extractNarClassLoader(sinkConfig.getArchive(), functionPkgUrl, uploadedInputStreamAsFile, false);
             ConfigValidation.validateConfig(sinkConfig, FunctionConfig.Runtime.JAVA.name(), clsLoader);
             return SinkConfigUtils.convert(sinkConfig, clsLoader);
         }
@@ -1076,6 +1075,55 @@ public class FunctionsImpl {
         }
     }
 
+    private NarClassLoader extractNarClassLoader(String archive, String pkgUrl, File uploadedInputStreamFileName,
+                                                 boolean isSource) {
+        if (!StringUtils.isEmpty(archive)) {
+            if (isSource) {
+                Path path;
+                try {
+                    path = this.worker().getConnectorsManager().getSourceArchive(archive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Source archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The source %s is corrupted", archive));
+                }
+            } else {
+                Path path;
+                try {
+                    path = this.worker().getConnectorsManager().getSinkArchive(archive);
+                } catch (Exception e) {
+                    throw new IllegalArgumentException(String.format("No Sink archive %s found", archive));
+                }
+                try {
+                    return NarClassLoader.getFromArchive(path.toFile(),
+                            Collections.emptySet());
+                } catch (IOException e) {
+                    throw new IllegalArgumentException(String.format("The sink %s is corrupted", archive));
+                }
+            }
+        }
+        if (!StringUtils.isEmpty(pkgUrl)) {
+            try {
+                return Utils.extractNarClassloader(pkgUrl, workerServiceSupplier.get().getWorkerConfig().getDownloadDirectory());
+            } catch (Exception e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        if (uploadedInputStreamFileName != null) {
+            try {
+                return NarClassLoader.getFromArchive(uploadedInputStreamFileName,
+                        Collections.emptySet());
+            } catch (IOException e) {
+                throw new IllegalArgumentException(e.getMessage());
+            }
+        }
+        return null;
+    }
+
     private void validateFunctionClassTypes(ClassLoader classLoader, FunctionDetails.Builder functionDetailsBuilder) {
 
         // validate only if classLoader is provided
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
index 175e2eb..e36b069 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2Resource.java
@@ -48,11 +48,11 @@ public class SinkApiV2Resource extends FunctionApiResource {
                                  final @PathParam("sinkName") String sinkName,
                                  final @FormDataParam("data") InputStream uploadedInputStream,
                                  final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                 final @FormDataParam("url") String sourcePkgUrl,
+                                 final @FormDataParam("url") String functionPkgUrl,
                                  final @FormDataParam("sinkConfig") String sinkConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sinkName, uploadedInputStream, fileDetail,
-                sourcePkgUrl, null, null, null, sinkConfigJson, clientAppId());
+                functionPkgUrl, null, null, null, sinkConfigJson, clientAppId());
 
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
index 9d25bf9..cfb6baa 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2Resource.java
@@ -44,15 +44,15 @@ public class SourceApiV2Resource extends FunctionApiResource {
     @Path("/{tenant}/{namespace}/{sourceName}")
     @Consumes(MediaType.MULTIPART_FORM_DATA)
     public Response registerSource(final @PathParam("tenant") String tenant,
-                                     final @PathParam("namespace") String namespace,
-                                     final @PathParam("sourceName") String sourceName,
-                                     final @FormDataParam("data") InputStream uploadedInputStream,
-                                     final @FormDataParam("data") FormDataContentDisposition fileDetail,
-                                     final @FormDataParam("url") String sourcePkgUrl,
-                                     final @FormDataParam("sourceConfig") String sourceConfigJson) {
+                                   final @PathParam("namespace") String namespace,
+                                   final @PathParam("sourceName") String sourceName,
+                                   final @FormDataParam("data") InputStream uploadedInputStream,
+                                   final @FormDataParam("data") FormDataContentDisposition fileDetail,
+                                   final @FormDataParam("url") String functionPkgUrl,
+                                   final @FormDataParam("sourceConfig") String sourceConfigJson) {
 
         return functions.registerFunction(tenant, namespace, sourceName, uploadedInputStream, fileDetail,
-                sourcePkgUrl, null, null, sourceConfigJson, null, clientAppId());
+                functionPkgUrl, null, null, sourceConfigJson, null, clientAppId());
 
     }