You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/12/13 01:19:05 UTC

[GitHub] srkukarni closed pull request #3166: Sources/Sinks can be launched using fat jars as well

srkukarni closed pull request #3166: Sources/Sinks can be launched using fat jars as well
URL: https://github.com/apache/pulsar/pull/3166
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
index c198cc3857..c3bc756d5f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/LocalRunner.java
@@ -120,19 +120,17 @@ void start() throws Exception {
             if (builtInSource != null) {
                 sourceConfig.setArchive(builtInSource);
             }
-            NarClassLoader classLoader;
             parallelism = sourceConfig.getParallelism();
             userCodeFile = sourceConfig.getArchive();
             if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                classLoader = extractNarClassLoader(null, userCodeFile, null);
+                functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, userCodeFile, null));
             } else {
                 File file = new File(userCodeFile);
                 if (!file.exists()) {
                     throw new RuntimeException("Source archive does not exist");
                 }
-                classLoader = extractNarClassLoader(null, null, file);
+                functionDetails = SourceConfigUtils.convert(sourceConfig, SourceConfigUtils.validate(sourceConfig, null, null, file));
             }
-            functionDetails = SourceConfigUtils.convert(sourceConfig, classLoader);
         } else {
             SinkConfig sinkConfig = new Gson().fromJson(sinkConfigString, SinkConfig.class);
             inferMissingArguments(sinkConfig);
@@ -140,19 +138,17 @@ void start() throws Exception {
             if (builtInSink != null) {
                 sinkConfig.setArchive(builtInSink);
             }
-            NarClassLoader classLoader;
             parallelism = sinkConfig.getParallelism();
             userCodeFile = sinkConfig.getArchive();
             if (org.apache.pulsar.common.functions.Utils.isFunctionPackageUrlSupported(userCodeFile)) {
-                classLoader = extractNarClassLoader(null, userCodeFile, null);
+                functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, userCodeFile, null));
             } else {
                 File file = new File(userCodeFile);
                 if (!file.exists()) {
                     throw new RuntimeException("Sink archive does not exist");
                 }
-                classLoader = extractNarClassLoader(null, null, file);
+                functionDetails = SinkConfigUtils.convert(sinkConfig, SinkConfigUtils.validate(sinkConfig, null, null, file));
             }
-            functionDetails = SinkConfigUtils.convert(sinkConfig, classLoader);
         }
         startLocalRun(functionDetails, parallelism,
                 instanceIdOffset, brokerServiceUrl, stateStorageServiceUrl,
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index d65b87fd4d..413070a954 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -21,6 +21,9 @@
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
@@ -39,7 +42,6 @@
 import java.nio.file.Path;
 import java.util.*;
 
-import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
@@ -47,27 +49,19 @@
 
 public class SinkConfigUtils {
 
-    public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader classLoader) throws IOException {
-
-        String sinkClassName = null;
-        String typeArg = null;
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    public static class ExtractedSinkDetails {
+        private String sinkClassName;
+        private String typeArg;
+    }
 
+    public static FunctionDetails convert(SinkConfig sinkConfig, ExtractedSinkDetails sinkDetails) throws IOException {
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
         boolean isBuiltin = !org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
-        if (!isBuiltin) {
-            if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-                if (isBlank(sinkConfig.getClassName())) {
-                    throw new IllegalArgumentException("Class-name must be present for archive with file-url");
-                }
-                sinkClassName = sinkConfig.getClassName(); // server derives the arg-type by loading a class
-            } else {
-                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
-                typeArg = getSinkType(sinkClassName, classLoader).getName();
-            }
-        }
-
         if (sinkConfig.getTenant() != null) {
             functionDetailsBuilder.setTenant(sinkConfig.getTenant());
         }
@@ -135,8 +129,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
             });
         }
 
-        if (typeArg != null) {
-            sourceSpecBuilder.setTypeClassName(typeArg);
+        if (sinkDetails.getTypeArg() != null) {
+            sourceSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
         }
         if (isNotBlank(sinkConfig.getSourceSubscriptionName())) {
             sourceSpecBuilder.setSubscriptionName(sinkConfig.getSourceSubscriptionName());
@@ -161,8 +155,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
 
         // set up sink spec
         Function.SinkSpec.Builder sinkSpecBuilder = Function.SinkSpec.newBuilder();
-        if (sinkClassName != null) {
-            sinkSpecBuilder.setClassName(sinkClassName);
+        if (sinkDetails.getSinkClassName() != null) {
+            sinkSpecBuilder.setClassName(sinkDetails.getSinkClassName());
         }
 
         if (isBuiltin) {
@@ -176,8 +170,8 @@ public static FunctionDetails convert(SinkConfig sinkConfig, NarClassLoader clas
         if (sinkConfig.getSecrets() != null && !sinkConfig.getSecrets().isEmpty()) {
             functionDetailsBuilder.setSecretsMap(new Gson().toJson(sinkConfig.getSecrets()));
         }
-        if (typeArg != null) {
-            sinkSpecBuilder.setTypeClassName(typeArg);
+        if (sinkDetails.getTypeArg() != null) {
+            sinkSpecBuilder.setTypeClassName(sinkDetails.getTypeArg());
         }
         functionDetailsBuilder.setSink(sinkSpecBuilder);
 
@@ -256,7 +250,7 @@ public static SinkConfig convertFromDetails(FunctionDetails functionDetails) {
         return sinkConfig;
     }
 
-    public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, String functionPkgUrl,
+    public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath, String functionPkgUrl,
                                           File uploadedInputStreamAsFile) {
         if (isEmpty(sinkConfig.getTenant())) {
             throw new IllegalArgumentException("Sink tenant cannot be null");
@@ -292,17 +286,29 @@ public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, S
             throw new IllegalArgumentException("Sink timeout must be a positive number");
         }
 
-        NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Sink Package is not provided");
-        }
-
         String sinkClassName;
-        try {
-            sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
-        } catch (IOException e1) {
-            throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+        ClassLoader classLoader;
+        if (!isEmpty(sinkConfig.getClassName())) {
+            sinkClassName = sinkConfig.getClassName();
+            try {
+                classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid Sink Jar");
+            }
+        } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
+            throw new IllegalArgumentException("Class-name must be present for archive with file-url");
+        } else {
+            classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Sink Package is not provided");
+            }
+            try {
+                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
+            } catch (IOException e1) {
+                throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
+            }
         }
+
         Class<?> typeArg = getSinkType(sinkClassName, classLoader);
 
         if (sinkConfig.getTopicToSerdeClassName() != null) {
@@ -333,7 +339,7 @@ public static NarClassLoader validate(SinkConfig sinkConfig, Path archivePath, S
                 }
             });
         }
-        return classLoader;
+        return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
     }
 
     private static Collection<String> collectAllInputTopics(SinkConfig sinkConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index a7baea219b..94274cf860 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -21,6 +21,9 @@
 
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.SourceConfig;
@@ -38,35 +41,25 @@
 import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
-import static org.apache.pulsar.common.naming.TopicName.DEFAULT_NAMESPACE;
-import static org.apache.pulsar.common.naming.TopicName.PUBLIC_TENANT;
 import static org.apache.pulsar.functions.utils.Utils.convertProcessingGuarantee;
 import static org.apache.pulsar.functions.utils.Utils.getSourceType;
 
 public class SourceConfigUtils {
 
-    public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader classLoader)
-            throws IllegalArgumentException, IOException {
-
-        String sourceClassName = null;
-        String typeArg = null;
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    public static class ExtractedSourceDetails {
+        private String sourceClassName;
+        private String typeArg;
+    }
 
+    public static FunctionDetails convert(SourceConfig sourceConfig, ExtractedSourceDetails sourceDetails)
+            throws IllegalArgumentException {
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
 
         boolean isBuiltin = !StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.BUILTIN);
 
-        if (!isBuiltin) {
-            if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-                if (org.apache.commons.lang3.StringUtils.isBlank(sourceConfig.getClassName())) {
-                    throw new IllegalArgumentException("Class-name must be present for archive with file-url");
-                }
-                sourceClassName = sourceConfig.getClassName(); // server derives the arg-type by loading a class
-            } else {
-                sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
-                typeArg = getSourceType(sourceClassName, classLoader).getName();
-            }
-        }
-
         if (sourceConfig.getTenant() != null) {
             functionDetailsBuilder.setTenant(sourceConfig.getTenant());
         }
@@ -91,8 +84,8 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
 
         // set source spec
         Function.SourceSpec.Builder sourceSpecBuilder = Function.SourceSpec.newBuilder();
-        if (sourceClassName != null) {
-            sourceSpecBuilder.setClassName(sourceClassName);
+        if (sourceDetails.getSourceClassName() != null) {
+            sourceSpecBuilder.setClassName(sourceDetails.getSourceClassName());
         }
 
         if (isBuiltin) {
@@ -108,8 +101,8 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
             functionDetailsBuilder.setSecretsMap(new Gson().toJson(sourceConfig.getSecrets()));
         }
 
-        if (typeArg != null) {
-            sourceSpecBuilder.setTypeClassName(typeArg);
+        if (sourceDetails.getTypeArg() != null) {
+            sourceSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
         }
         functionDetailsBuilder.setSource(sourceSpecBuilder);
 
@@ -125,8 +118,8 @@ public static FunctionDetails convert(SourceConfig sourceConfig, NarClassLoader
 
         sinkSpecBuilder.setTopic(sourceConfig.getTopicName());
 
-        if (typeArg != null) {
-            sinkSpecBuilder.setTypeClassName(typeArg);
+        if (sourceDetails.getTypeArg() != null) {
+            sinkSpecBuilder.setTypeClassName(sourceDetails.getTypeArg());
         }
 
         functionDetailsBuilder.setSink(sinkSpecBuilder);
@@ -189,7 +182,7 @@ public static SourceConfig convertFromDetails(FunctionDetails functionDetails) {
         return sourceConfig;
     }
 
-    public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePath, String functionPkgUrl, File uploadedInputStreamAsFile) {
+    public static ExtractedSourceDetails validate(SourceConfig sourceConfig, Path archivePath, String functionPkgUrl, File uploadedInputStreamAsFile) {
         if (isEmpty(sourceConfig.getTenant())) {
             throw new IllegalArgumentException("Source tenant cannot be null");
         }
@@ -212,16 +205,27 @@ public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePat
             ResourceConfigUtils.validate(sourceConfig.getResources());
         }
 
-        NarClassLoader classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
-        if (classLoader == null) {
-            throw new IllegalArgumentException("Source Package is not provided");
-        }
-
         String sourceClassName;
-        try {
-            sourceClassName = ConnectorUtils.getIOSourceClass(classLoader);
-        } catch (IOException e1) {
-            throw new IllegalArgumentException("Failed to extract source class from archive", e1);
+        ClassLoader classLoader;
+        if (!isEmpty(sourceConfig.getClassName())) {
+            sourceClassName = sourceConfig.getClassName();
+            try {
+                classLoader = Utils.extractClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            } catch (Exception e) {
+                throw new IllegalArgumentException("Invalid Source Jar");
+            }
+        } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
+            throw new IllegalArgumentException("Class-name must be present for archive with file-url");
+        } else {
+            classLoader = Utils.extractNarClassLoader(archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            if (classLoader == null) {
+                throw new IllegalArgumentException("Source Package is not provided");
+            }
+            try {
+                sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
+            } catch (IOException e1) {
+                throw new IllegalArgumentException("Failed to extract source class from archive", e1);
+            }
         }
 
         Class<?> typeArg = getSourceType(sourceClassName, classLoader);
@@ -237,7 +241,8 @@ public static NarClassLoader validate(SourceConfig sourceConfig, Path archivePat
         if (!StringUtils.isEmpty(sourceConfig.getSchemaType())) {
             ValidatorUtils.validateSchema(sourceConfig.getSchemaType(), typeArg, classLoader, false);
         }
-        return classLoader;
+
+        return new ExtractedSourceDetails(sourceClassName, typeArg.getName());
     }
 
     public static SourceConfig validateUpdate(SourceConfig existingConfig, SourceConfig newConfig) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
index 7414446670..1c5d64d6c7 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Utils.java
@@ -49,6 +49,8 @@
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 
+import static org.apache.commons.lang3.StringUtils.isEmpty;
+
 /**
  * Utils used for runtime.
  */
@@ -211,6 +213,19 @@ public static Runtime convertRuntime(FunctionConfig.Runtime runtime) {
         return typeArg;
     }
 
+    public static ClassLoader extractClassLoader(Path archivePath, String functionPkgUrl, File uploadedInputStreamAsFile) throws Exception {
+        if (!isEmpty(functionPkgUrl)) {
+            return extractClassLoader(functionPkgUrl);
+        }
+        if (archivePath != null) {
+            return loadJar(archivePath.toFile());
+        }
+        if (uploadedInputStreamAsFile != null) {
+            return loadJar(uploadedInputStreamAsFile);
+        }
+        return null;
+    }
+
     /**
      * Load a jar
      * @param jar file of jar
@@ -293,7 +308,7 @@ public static NarClassLoader extractNarClassLoader(Path archivePath, String pkgU
                 throw new IllegalArgumentException(String.format("The archive %s is corrupted", archivePath));
             }
         }
-        if (!StringUtils.isEmpty(pkgUrl)) {
+        if (!isEmpty(pkgUrl)) {
             if (pkgUrl.startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
                 try {
                     URL url = new URL(pkgUrl);
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 858cd72966..dcc415b935 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -57,7 +57,7 @@ public void testConvertBackFidelity() throws IOException  {
         sinkConfig.setRetainOrdering(false);
         sinkConfig.setAutoAck(true);
         sinkConfig.setTimeoutMs(2000l);
-        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, null);
+        Function.FunctionDetails functionDetails = SinkConfigUtils.convert(sinkConfig, new SinkConfigUtils.ExtractedSinkDetails(null, null));
         SinkConfig convertedConfig = SinkConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
                 new Gson().toJson(sinkConfig),
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 7cb7d3c5b5..a3aad9158d 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -51,7 +51,7 @@ public void testConvertBackFidelity() throws IOException  {
         sourceConfig.setParallelism(1);
         sourceConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
         sourceConfig.setConfigs(new HashMap<>());
-        Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, null);
+        Function.FunctionDetails functionDetails = SourceConfigUtils.convert(sourceConfig, new SourceConfigUtils.ExtractedSourceDetails(null, null));
         SourceConfig convertedConfig = SourceConfigUtils.convertFromDetails(functionDetails);
         assertEquals(
                 new Gson().toJson(sourceConfig),
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
index 3bd5a01e66..dd2dce2d83 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/ComponentImpl.java
@@ -1270,12 +1270,12 @@ private FunctionDetails validateUpdateRequestParamsWithPkgUrl(String tenant, Str
     private FunctionDetails validateUpdateRequestParams(String tenant, String namespace, String componentName,
                                                         File uploadedInputStreamAsFile, FormDataContentDisposition fileDetail, String functionDetailsJson,
                                                         String componentConfigJson, ComponentType componentType)
-            throws IllegalArgumentException, IOException, URISyntaxException {
+            throws IllegalArgumentException, IOException {
 
         FunctionDetails functionDetails = validateUpdateRequestParams(tenant, namespace, componentName,
                 functionDetailsJson, componentConfigJson, componentType,null, uploadedInputStreamAsFile);
         if (!isFunctionCodeBuiltin(functionDetails) && (uploadedInputStreamAsFile == null || fileDetail == null)) {
-            throw new IllegalArgumentException("Function Package is not provided");
+            throw new IllegalArgumentException(componentType + " Package is not provided");
         }
 
         return functionDetails;
@@ -1397,8 +1397,8 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
                     throw new IllegalArgumentException(String.format("No Source archive %s found", archivePath));
                 }
             }
-            NarClassLoader clsLoader = SourceConfigUtils.validate(sourceConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
-            return SourceConfigUtils.convert(sourceConfig, clsLoader);
+            SourceConfigUtils.ExtractedSourceDetails sourceDetails = SourceConfigUtils.validate(sourceConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            return SourceConfigUtils.convert(sourceConfig, sourceDetails);
         }
         if (componentType.equals(SINK)) {
             Path archivePath = null;
@@ -1419,8 +1419,8 @@ private FunctionDetails validateUpdateRequestParams(String tenant, String namesp
                     throw new IllegalArgumentException(String.format("No Sink archive %s found", archivePath));
                 }
             }
-            NarClassLoader clsLoader = SinkConfigUtils.validate(sinkConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
-            return SinkConfigUtils.convert(sinkConfig, clsLoader);
+            SinkConfigUtils.ExtractedSinkDetails sinkDetails = SinkConfigUtils.validate(sinkConfig, archivePath, functionPkgUrl, uploadedInputStreamAsFile);
+            return SinkConfigUtils.convert(sinkConfig, sinkDetails);
         }
         FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
         org.apache.pulsar.functions.utils.Utils.mergeJson(functionDetailsJson, functionDetailsBuilder);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
index a0e6843769..bb497ae2de 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SinkApiV2ResourceTest.java
@@ -234,7 +234,7 @@ public void testRegisterSinkMissingPackage() throws IOException {
             null,
             mockedFormData,
             topicsToSerDeClassName,
-            className,
+            null,
             parallelism,
                 null,
                 "Sink Package is not provided");
@@ -249,7 +249,7 @@ public void testRegisterSinkMissingPackageDetails() throws IOException {
             mockedInputStream,
             null,
             topicsToSerDeClassName,
-            className,
+            null,
             parallelism,
                 null,
                 "zip file is empty");
@@ -263,9 +263,9 @@ public void testRegisterSinkInvalidJarNoSink() throws IOException {
                 namespace,
                 sink,
                 inputStream,
-                null,
+                mockedFormData,
                 topicsToSerDeClassName,
-                className,
+                null,
                 parallelism,
                 null,
                 "Failed to extract sink class from archive");
@@ -328,7 +328,7 @@ public void testRegisterSinkHttpUrl() throws IOException {
                 className,
                 parallelism,
                 "http://localhost:1234/test",
-                "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)");
+                "Invalid Sink Jar");
     }
 
     private void testRegisterSinkMissingArguments(
@@ -582,7 +582,7 @@ public void testUpdateSinkMissingPackage() throws IOException {
             null,
             mockedFormData,
             topicsToSerDeClassName,
-            className,
+            null,
             parallelism,
                 "Update contains no change");
     }
@@ -600,7 +600,7 @@ public void testUpdateSinkMissingInputs() throws IOException {
                 null,
                 mockedFormData,
                 null,
-                className,
+                null,
                 parallelism,
                 "Update contains no change");
     }
@@ -1219,6 +1219,6 @@ private SinkConfig createDefaultSinkConfig() {
     }
 
     private FunctionDetails createDefaultFunctionDetails() throws IOException {
-        return SinkConfigUtils.convert(createDefaultSinkConfig(), null);
+        return SinkConfigUtils.convert(createDefaultSinkConfig(), new SinkConfigUtils.ExtractedSinkDetails(null, null));
     }
 }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
index fcba85c61a..f7af3cd981 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v2/SourceApiV2ResourceTest.java
@@ -223,7 +223,7 @@ public void testRegisterSourceMissingPackage() throws IOException {
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
-            className,
+            null,
             parallelism,
                 null,
                 "Source Package is not provided");
@@ -242,7 +242,7 @@ public void testRegisterSourceMissingPackageDetails() throws IOException {
             className,
             parallelism,
                 null,
-                "zip file is empty");
+                "Source Package is not provided");
     }
 
     @Test
@@ -256,7 +256,7 @@ public void testRegisterSourceInvalidJarWithNoSource() throws IOException {
                 null,
                 outputTopic,
                 outputSerdeClassName,
-                className,
+                null,
                 parallelism,
                 null,
                 "Failed to extract source class from archive");
@@ -292,7 +292,7 @@ public void testRegisterSourceHttpUrl() throws IOException {
                 className,
                 parallelism,
                 "http://localhost:1234/test",
-                "Corrupt User PackageFile " + "http://localhost:1234/test with error Connection refused (Connection refused)");
+                "Invalid Source Jar");
     }
 
     private void testRegisterSourceMissingArguments(
@@ -555,7 +555,7 @@ public void testUpdateSourceMissingPackage() throws IOException {
             mockedFormData,
             outputTopic,
                 outputSerdeClassName,
-            className,
+            null,
             parallelism,
                 "Update contains no change");
     }
@@ -574,7 +574,7 @@ public void testUpdateSourceMissingTopicName() throws IOException {
                 mockedFormData,
                 null,
                 outputSerdeClassName,
-                className,
+                null,
                 parallelism,
                 "Update contains no change");
     }
@@ -1222,7 +1222,7 @@ private SourceConfig createDefaultSourceConfig() {
         return sourceConfig;
     }
 
-    private FunctionDetails createDefaultFunctionDetails() throws IOException {
-        return SourceConfigUtils.convert(createDefaultSourceConfig(), null);
+    private FunctionDetails createDefaultFunctionDetails() {
+        return SourceConfigUtils.convert(createDefaultSourceConfig(), new SourceConfigUtils.ExtractedSourceDetails(null, null));
     }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services