You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/07/03 21:31:28 UTC

[pulsar] branch master updated: fix issue when submitting NAR via file url (#4577)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e8025d5  fix issue when submitting NAR via file url (#4577)
e8025d5 is described below

commit e8025d50c5d2cf0a632ad1573308b676d0607923
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Wed Jul 3 14:31:23 2019 -0700

    fix issue when submitting NAR via file url (#4577)
    
    * fix issue when submitting NAR via file url
    
    * fix unit tests
    
    * add more specific errors
    
    * fix test
---
 .../worker/PulsarFunctionLocalRunTest.java         |  21 ++-
 .../org/apache/pulsar/functions/LocalRunner.java   |   1 -
 .../pulsar/functions/utils/SinkConfigUtils.java    | 145 +++++++++++++--------
 .../pulsar/functions/utils/SourceConfigUtils.java  | 122 ++++++++++-------
 .../functions/worker/rest/api/FunctionsImpl.java   |   1 -
 .../functions/worker/rest/api/SinksImpl.java       |   1 -
 .../functions/worker/rest/api/SourcesImpl.java     |   1 -
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  17 ++-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |   5 +-
 9 files changed, 192 insertions(+), 122 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index bdcd27d..db7dbf2 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -599,12 +599,11 @@ public class PulsarFunctionLocalRunTest {
         testPulsarSourceLocalRun(null);
     }
 
-    // TODO bug to fix involving submitting a NAR via URI file:///tmp/pulsar-io-twitter-0.0.1.nar
-//    @Test(timeOut = 20000)
-//    public void testPulsarSourceLocalRunWithFile() throws Exception {
-//        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-//        testPulsarSourceStats(jarFilePathUrl);
-//    }
+    @Test(timeOut = 20000)
+    public void testPulsarSourceLocalRunWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSourceLocalRun(jarFilePathUrl);
+    }
 
     @Test(timeOut = 40000)
     public void testPulsarSourceLocalRunWithUrl() throws Exception {
@@ -705,11 +704,11 @@ public class PulsarFunctionLocalRunTest {
         testPulsarSinkStats(null);
     }
 
-//    @Test(timeOut = 20000)
-//    public void testPulsarSinkStatsWithFile() throws Exception {
-//        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
-//        testPulsarSinkStats(jarFilePathUrl);
-//    }
+    @Test(timeOut = 20000)
+    public void testPulsarSinkStatsWithFile() throws Exception {
+        String jarFilePathUrl = Utils.FILE + ":" + getClass().getClassLoader().getResource("pulsar-io-data-generator.nar").getFile();
+        testPulsarSinkStats(jarFilePathUrl);
+    }
 
     @Test(timeOut = 40000)
     public void testPulsarSinkStatsWithUrl() throws Exception {
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index 1540744..f4c6eca 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -248,7 +248,6 @@ public class LocalRunner {
                             .loadClass(LocalRunner.class.getName())
                             .getProtectionDomain().getCodeSource().getLocation().getFile();
                 }
-                log.info("userCodeFile: {}", userCodeFile);
 
                 String builtInSource = isBuiltInSource(userCodeFile);
                 if (builtInSource != null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 42eedc3..f0a39b9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -42,7 +42,11 @@ import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Type;
 import java.nio.file.Path;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -283,7 +287,7 @@ public class SinkConfigUtils {
     }
 
     public static ExtractedSinkDetails validate(SinkConfig sinkConfig, Path archivePath,
-                                          File uploadedInputStreamAsFile) {
+                                          File sinkPackageFile) {
         if (isEmpty(sinkConfig.getTenant())) {
             throw new IllegalArgumentException("Sink tenant cannot be null");
         }
@@ -318,79 +322,112 @@ public class SinkConfigUtils {
             throw new IllegalArgumentException("Sink timeout must be a positive number");
         }
 
-        String sinkClassName;
-        final Class<?> typeArg;
-        final ClassLoader classLoader;
-        if (!isEmpty(sinkConfig.getClassName())) {
-            sinkClassName = sinkConfig.getClassName();
-            // We really don't know if we should use nar class loader or regular classloader
-            ClassLoader jarClassLoader = null;
-            ClassLoader narClassLoader = null;
-            try {
-                jarClassLoader = FunctionCommon.extractClassLoader(archivePath, uploadedInputStreamAsFile);
-            } catch (Exception e) {
+        if (archivePath == null && sinkPackageFile == null) {
+            throw new IllegalArgumentException("Sink package is not provided");
+        }
+
+        Class<?> typeArg;
+        ClassLoader classLoader;
+        String sinkClassName = sinkConfig.getClassName();
+        ClassLoader jarClassLoader = null;
+        ClassLoader narClassLoader = null;
+
+        Exception jarClassLoaderException = null;
+        Exception narClassLoaderException = null;
+
+        try {
+            jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile);
+        } catch (Exception e) {
+            jarClassLoaderException = e;
+        }
+        try {
+            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sinkPackageFile);
+        } catch (Exception e) {
+            narClassLoaderException = e;
+        }
+
+        // if sink class name is not provided, we can only try to load archive as a NAR
+        if (isEmpty(sinkClassName)) {
+            if (narClassLoader == null) {
+                throw new IllegalArgumentException("Sink package does not have the correct format. " +
+                        "Pulsar cannot determine if the package is a NAR package or JAR package." +
+                        "Sink classname is not provided and attempts to load it as a NAR package produced error: "
+                        + narClassLoaderException.getMessage());
             }
             try {
-                narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
-            } catch (Exception e) {
-            }
-            if (jarClassLoader == null && narClassLoader == null) {
-                throw new IllegalArgumentException("Invalid Sink Package");
+                sinkClassName = ConnectorUtils.getIOSinkClass(narClassLoader);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Failed to extract Sink class from archive", e);
             }
-            // We use typeArg and classLoader as arguments for lambda functions that require them to be final
-            // Thus we use these tmp vars
-            Class<?> tmptypeArg;
-            ClassLoader tmpclassLoader;
             try {
-                tmptypeArg = getSinkType(sinkClassName, narClassLoader);
-                tmpclassLoader = narClassLoader;
-            } catch (Exception e) {
+                typeArg = getSinkType(sinkClassName, narClassLoader);
+                classLoader = narClassLoader;
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        String.format("Sink class %s must be in class path", sinkClassName), e);
+            }
+
+        } else {
+            // if sink class name is provided, we need to try to load it as a JAR and as a NAR.
+            if (jarClassLoader != null) {
+                try {
+                    typeArg = getSinkType(sinkClassName, jarClassLoader);
+                    classLoader = jarClassLoader;
+                } catch (ClassNotFoundException e) {
+                    // class not found in JAR try loading as a NAR and searching for the class
+                    if (narClassLoader != null) {
+                        try {
+                            typeArg = getSinkType(sinkClassName, narClassLoader);
+                            classLoader = narClassLoader;
+                        } catch (ClassNotFoundException e1) {
+                            throw new IllegalArgumentException(
+                                    String.format("Sink class %s must be in class path", sinkClassName), e1);
+                        }
+                    } else {
+                        throw new IllegalArgumentException(
+                                String.format("Sink class %s must be in class path", sinkClassName), e);
+                    }
+                }
+            } else if (narClassLoader != null) {
                 try {
-                    tmptypeArg = getSinkType(sinkClassName, jarClassLoader);
+                    typeArg = getSinkType(sinkClassName, narClassLoader);
+                    classLoader = narClassLoader;
                 } catch (ClassNotFoundException e1) {
                     throw new IllegalArgumentException(
                             String.format("Sink class %s must be in class path", sinkClassName), e1);
                 }
-                tmpclassLoader = jarClassLoader;
-            }
-            typeArg = tmptypeArg;
-            classLoader = tmpclassLoader;
-        } else if (!org.apache.commons.lang3.StringUtils.isEmpty(sinkConfig.getArchive()) && sinkConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-            throw new IllegalArgumentException("Class-name must be present for archive with file-url");
-        } else {
-            classLoader = FunctionCommon.extractNarClassLoader(archivePath, uploadedInputStreamAsFile);
-            if (classLoader == null) {
-                throw new IllegalArgumentException("Sink Package is not provided");
-            }
-            try {
-                sinkClassName = ConnectorUtils.getIOSinkClass(classLoader);
-            } catch (IOException e1) {
-                throw new IllegalArgumentException("Failed to extract sink class from archive", e1);
-            }
-            try {
-                typeArg = getSinkType(sinkClassName, classLoader);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalArgumentException(
-                        String.format("Sink class %s must be in class path", sinkClassName), e);
+            } else {
+                StringBuilder errorMsg = new StringBuilder("Sink package does not have the correct format." +
+                        " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+                if (jarClassLoaderException != null) {
+                    errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+                }
+
+                if (narClassLoaderException != null) {
+                    errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+                }
+
+                throw new IllegalArgumentException(errorMsg.toString());
             }
         }
 
         if (sinkConfig.getTopicToSerdeClassName() != null) {
-            sinkConfig.getTopicToSerdeClassName().forEach((topicName, serdeClassName) -> {
-                ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
-            });
+           for (String serdeClassName : sinkConfig.getTopicToSerdeClassName().values()) {
+               ValidatorUtils.validateSerde(serdeClassName, typeArg, classLoader, true);
+           }
         }
 
         if (sinkConfig.getTopicToSchemaType() != null) {
-            sinkConfig.getTopicToSchemaType().forEach((topicName, schemaType) -> {
+            for (String schemaType : sinkConfig.getTopicToSchemaType().values()) {
                 ValidatorUtils.validateSchema(schemaType, typeArg, classLoader, true);
-            });
+            }
         }
 
         // topicsPattern does not need checks
 
         if (sinkConfig.getInputSpecs() != null) {
-            sinkConfig.getInputSpecs().forEach((topicName, consumerSpec) -> {
+            for (ConsumerConfig consumerSpec : sinkConfig.getInputSpecs().values()) {
                 // Only one is set
                 if (!isEmpty(consumerSpec.getSerdeClassName()) && !isEmpty(consumerSpec.getSchemaType())) {
                     throw new IllegalArgumentException("Only one of serdeClassName or schemaType should be set");
@@ -401,7 +438,7 @@ public class SinkConfigUtils {
                 if (!isEmpty(consumerSpec.getSchemaType())) {
                     ValidatorUtils.validateSchema(consumerSpec.getSchemaType(), typeArg, classLoader, true);
                 }
-            });
+            }
         }
         return new ExtractedSinkDetails(sinkClassName, typeArg.getName());
     }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index edbb6d7..149204c 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -222,61 +222,93 @@ public class SourceConfigUtils {
         if (sourceConfig.getResources() != null) {
             ResourceConfigUtils.validate(sourceConfig.getResources());
         }
+        if (archivePath == null && sourcePackageFile == null) {
+            throw new IllegalArgumentException("Source package is not provided");
+        }
 
-        String sourceClassName;
-        final Class<?> typeArg;
-        final ClassLoader classLoader;
-        if (!isEmpty(sourceConfig.getClassName())) {
-            sourceClassName = sourceConfig.getClassName();
-            // We really don't know if we should use nar class loader or regular classloader
-            ClassLoader jarClassLoader = null;
-            ClassLoader narClassLoader = null;
-            try {
-                jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
-            } catch (Exception e) {
+        Class<?> typeArg;
+        ClassLoader classLoader;
+        String sourceClassName = sourceConfig.getClassName();
+        ClassLoader jarClassLoader = null;
+        ClassLoader narClassLoader = null;
+
+        Exception jarClassLoaderException = null;
+        Exception narClassLoaderException = null;
+
+        try {
+            jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
+        } catch (Exception e) {
+            jarClassLoaderException = e;
+        }
+        try {
+            narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
+        } catch (Exception e) {
+            narClassLoaderException = e;
+        }
+
+        // if source class name is not provided, we can only try to load archive as a NAR
+        if (isEmpty(sourceClassName)) {
+            if (narClassLoader == null) {
+                throw new IllegalArgumentException("Source package does not have the correct format. " +
+                        "Pulsar cannot determine if the package is a NAR package or JAR package." +
+                        "Source classname is not provided and attempts to load it as a NAR package produced the following error.",
+                        narClassLoaderException);
             }
             try {
-                narClassLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
-            } catch (Exception e) {
-            }
-            if (jarClassLoader == null && narClassLoader == null) {
-                throw new IllegalArgumentException("Invalid Source Package");
+                sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) narClassLoader);
+            } catch (IOException e) {
+                throw new IllegalArgumentException("Failed to extract source class from archive", e);
             }
-            // We use typeArg and classLoader as arguments for lambda functions that require them to be final
-            // Thus we use these tmp vars
-            Class<?> tmptypeArg;
-            ClassLoader tmpclassLoader;
             try {
-                tmptypeArg = getSourceType(sourceClassName, narClassLoader);
-                tmpclassLoader = narClassLoader;
-            } catch (Exception e) {
+                typeArg = getSourceType(sourceClassName, narClassLoader);
+                classLoader = narClassLoader;
+            } catch (ClassNotFoundException e) {
+                throw new IllegalArgumentException(
+                        String.format("Source class %s must be in class path", sourceClassName), e);
+            }
+
+        } else {
+            // if source class name is provided, we need to try to load it as a JAR and as a NAR.
+            if (jarClassLoader != null) {
                 try {
-                    tmptypeArg = getSourceType(sourceClassName, jarClassLoader);
+                    typeArg = getSourceType(sourceClassName, jarClassLoader);
+                    classLoader = jarClassLoader;
+                } catch (ClassNotFoundException e) {
+                    // class not found in JAR try loading as a NAR and searching for the class
+                    if (narClassLoader != null) {
+                        try {
+                            typeArg = getSourceType(sourceClassName, narClassLoader);
+                            classLoader = narClassLoader;
+                        } catch (ClassNotFoundException e1) {
+                            throw new IllegalArgumentException(
+                                    String.format("Source class %s must be in class path", sourceClassName), e1);
+                        }
+                    } else {
+                        throw new IllegalArgumentException(
+                                String.format("Source class %s must be in class path", sourceClassName), e);
+                    }
+                }
+            } else if (narClassLoader != null) {
+                try {
+                    typeArg = getSourceType(sourceClassName, narClassLoader);
+                    classLoader = narClassLoader;
                 } catch (ClassNotFoundException e1) {
                     throw new IllegalArgumentException(
                             String.format("Source class %s must be in class path", sourceClassName), e1);
                 }
-                tmpclassLoader = jarClassLoader;
-            }
-            typeArg = tmptypeArg;
-            classLoader = tmpclassLoader;
-        } else if (!StringUtils.isEmpty(sourceConfig.getArchive()) && sourceConfig.getArchive().startsWith(org.apache.pulsar.common.functions.Utils.FILE)) {
-            throw new IllegalArgumentException("Class-name must be present for archive with file-url");
-        } else {
-            classLoader = FunctionCommon.extractNarClassLoader(archivePath, sourcePackageFile);
-            if (classLoader == null) {
-                throw new IllegalArgumentException("Source Package is not provided");
-            }
-            try {
-                sourceClassName = ConnectorUtils.getIOSourceClass((NarClassLoader) classLoader);
-            } catch (IOException e1) {
-                throw new IllegalArgumentException("Failed to extract source class from archive", e1);
-            }
-            try {
-                typeArg = getSourceType(sourceClassName, classLoader);
-            } catch (ClassNotFoundException e) {
-                throw new IllegalArgumentException(
-                        String.format("Source class %s must be in class path", sourceClassName), e);
+            } else {
+                StringBuilder errorMsg = new StringBuilder("Source package does not have the correct format." +
+                        " Pulsar cannot determine if the package is a NAR package or JAR package.");
+
+                if (jarClassLoaderException != null) {
+                    errorMsg.append("Attempts to load it as a JAR package produced error: " + jarClassLoaderException.getMessage());
+                }
+
+                if (narClassLoaderException != null) {
+                    errorMsg.append("Attempts to load it as a NAR package produced error: " + narClassLoaderException.getMessage());
+                }
+
+                throw new IllegalArgumentException(errorMsg.toString());
             }
         }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
index af23496..d96ed30 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/FunctionsImpl.java
@@ -339,7 +339,6 @@ public class FunctionsImpl extends ComponentImpl {
 
                     componentPackageFile = FunctionCommon.createPkgTempFile();
                     componentPackageFile.deleteOnExit();
-                    log.info("componentPackageFile: {}", componentPackageFile);
                     WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
 
                     functionDetails = validateUpdateRequestParams(tenant, namespace, functionName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
index 9052266..659cb66 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SinksImpl.java
@@ -344,7 +344,6 @@ public class SinksImpl extends ComponentImpl {
 
                     componentPackageFile = FunctionCommon.createPkgTempFile();
                     componentPackageFile.deleteOnExit();
-                    log.info("componentPackageFile: {}", componentPackageFile);
                     WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
 
                     functionDetails = validateUpdateRequestParams(tenant, namespace, sinkName,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
index e14f167..e9cfeb0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/rest/api/SourcesImpl.java
@@ -341,7 +341,6 @@ public class SourcesImpl extends ComponentImpl {
 
                     componentPackageFile = FunctionCommon.createPkgTempFile();
                     componentPackageFile.deleteOnExit();
-                    log.info("componentPackageFile: {}", componentPackageFile);
                     WorkerUtils.downloadFromBookkeeper(worker().getDlogNamespace(), componentPackageFile, existingComponent.getPackageLocation().getPackagePath());
 
                     functionDetails = validateUpdateRequestParams(tenant, namespace, sourceName,
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index 793e9cd..41b3204 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -19,7 +19,6 @@
 package org.apache.pulsar.functions.worker.rest.api.v3;
 
 import com.google.common.collect.Lists;
-import com.google.gson.Gson;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.distributedlog.api.namespace.Namespace;
 import org.apache.logging.log4j.Level;
@@ -243,7 +242,7 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink Package is not provided")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package is not provided")
     public void testRegisterSinkMissingPackage() {
         try {
             testRegisterSinkMissingArguments(
@@ -283,7 +282,10 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "zip file is empty")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Sink package does not have the" +
+            " correct format. Pulsar cannot determine if the package is a NAR package" +
+            " or JAR package.Sink classname is not provided and attempts to load it as a NAR package produced error: " +
+            "zip file is empty")
     public void testRegisterSinkMissingPackageDetails() {
         try {
             testRegisterSinkMissingArguments(
@@ -303,7 +305,7 @@ public class SinkApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract sink class from archive")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Failed to extract Sink class from archive")
     public void testRegisterSinkInvalidJarNoSink() throws IOException {
         try {
             FileInputStream inputStream = new FileInputStream(INVALID_JAR_FILE_PATH);
@@ -948,6 +950,7 @@ public class SinkApiV3ResourceTest {
                 anyString(),
                 any(File.class),
                 any(Namespace.class));
+        PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
 
         when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
@@ -961,7 +964,7 @@ public class SinkApiV3ResourceTest {
     }
 
     @Test
-    public void testUpdateSinkWithUrl() throws IOException, ClassNotFoundException {
+    public void testUpdateSinkWithUrl() throws Exception {
         Configurator.setRootLevel(Level.DEBUG);
 
         String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -982,6 +985,7 @@ public class SinkApiV3ResourceTest {
         mockStatic(FunctionCommon.class);
         doReturn(String.class).when(FunctionCommon.class);
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
+        PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
         FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));
@@ -989,7 +993,6 @@ public class SinkApiV3ResourceTest {
         doReturn(ATLEAST_ONCE).when(FunctionCommon.class);
         FunctionCommon.convertProcessingGuarantee(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
 
-
         this.mockedFunctionMetaData = FunctionMetaData.newBuilder().setFunctionDetails(createDefaultFunctionDetails()).build();
         when(mockedManager.getFunctionMetaData(any(), any(), any())).thenReturn(mockedFunctionMetaData);
 
@@ -1019,6 +1022,7 @@ public class SinkApiV3ResourceTest {
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
+            PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
@@ -1044,6 +1048,7 @@ public class SinkApiV3ResourceTest {
                     anyString(),
                     any(File.class),
                     any(Namespace.class));
+            PowerMockito.when(WorkerUtils.class, "dumpToTmpFile", any()).thenCallRealMethod();
 
             when(mockedManager.containsFunction(eq(tenant), eq(namespace), eq(sink))).thenReturn(true);
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index 17b4595..276f3c0 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -263,7 +263,7 @@ public class SourceApiV3ResourceTest {
         }
     }
 
-    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source Package is not provided")
+    @Test(expectedExceptions = RestException.class, expectedExceptionsMessageRegExp = "Source package is not provided")
     public void testRegisterSourceMissingPackage() {
         try {
             testRegisterSourceMissingArguments(
@@ -979,7 +979,7 @@ public class SourceApiV3ResourceTest {
     }
 
     @Test
-    public void testUpdateSourceWithUrl() throws IOException, ClassNotFoundException {
+    public void testUpdateSourceWithUrl() throws Exception {
         Configurator.setRootLevel(Level.DEBUG);
 
         String filePackageUrl = "file://" + JAR_FILE_PATH;
@@ -1000,6 +1000,7 @@ public class SourceApiV3ResourceTest {
         mockStatic(FunctionCommon.class);
         doReturn(String.class).when(FunctionCommon.class);
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));
+        PowerMockito.when(FunctionCommon.class, "extractFileFromPkgURL", any()).thenCallRealMethod();
 
         doReturn(mock(NarClassLoader.class)).when(FunctionCommon.class);
         FunctionCommon.extractNarClassLoader(any(Path.class), any(File.class));