You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/30 05:24:46 UTC

[pulsar] branch master updated: Moved ClassLoading and Reflections Helper functions to common (#7103)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e451be3  Moved ClassLoading and Reflections Helper functions to common (#7103)
e451be3 is described below

commit e451be300e5a0e718c444dd3a5b3a80a16734974
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 29 22:24:29 2020 -0700

    Moved ClassLoading and Reflections Helper functions to common (#7103)
    
    * Moved ClassLoading and Reflections Helper functions to common
    
    * Fix tests
    
    * Fix test
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../apache/pulsar/io/PulsarFunctionTlsTest.java    |  4 +-
 .../apache/pulsar/admin/cli/CmdFunctionsTest.java  |  2 +-
 .../org/apache/pulsar/admin/cli/TestCmdSinks.java  |  4 +-
 .../apache/pulsar/admin/cli/TestCmdSources.java    |  4 +-
 .../pulsar/common/util/ClassLoaderUtils.java       | 80 ++++++++++++++++++++++
 .../apache/pulsar/common/util}/Reflections.java    | 16 +++--
 .../pulsar/common/util}/ReflectionsTest.java       |  5 +-
 .../pulsar/functions/instance/InstanceUtils.java   |  2 +-
 .../functions/instance/JavaInstanceRunnable.java   |  4 +-
 .../apache/pulsar/functions/sink/PulsarSink.java   |  2 +-
 .../pulsar/functions/source/PulsarSource.java      |  3 +-
 .../windowing/WindowFunctionExecutor.java          |  2 +-
 .../functions/auth/FunctionAuthProvider.java       |  2 +-
 .../auth/KubernetesFunctionAuthProvider.java       |  4 +-
 .../functions/runtime/JavaInstanceStarter.java     |  2 +-
 .../functions/runtime/RuntimeCustomizer.java       |  2 +-
 .../pulsar/functions/runtime/RuntimeFactory.java   |  2 +-
 .../runtime/thread/ThreadRuntimeFactory.java       |  2 +-
 .../pulsar/functions/utils/FunctionCommon.java     | 54 +--------------
 .../functions/utils/FunctionConfigUtils.java       |  2 +-
 .../pulsar/functions/utils/SinkConfigUtils.java    |  5 +-
 .../pulsar/functions/utils/SourceConfigUtils.java  |  4 +-
 .../pulsar/functions/utils/ValidatorUtils.java     | 10 +--
 .../functions/utils/FunctionConfigUtilsTest.java   |  1 +
 .../functions/utils/SinkConfigUtilsTest.java       |  1 +
 .../functions/utils/SourceConfigUtilsTest.java     |  2 +-
 .../functions/worker/FunctionRuntimeManager.java   |  2 +-
 .../pulsar/functions/worker/SchedulerManager.java  |  2 +-
 .../worker/rest/api/v3/SinkApiV3ResourceTest.java  |  9 ++-
 .../rest/api/v3/SourceApiV3ResourceTest.java       |  9 ++-
 30 files changed, 145 insertions(+), 98 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index ed33614..e241f76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -52,11 +52,11 @@ import org.apache.pulsar.client.api.Authentication;
 import org.apache.pulsar.client.impl.auth.AuthenticationTls;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -238,7 +238,7 @@ public class PulsarFunctionTlsTest {
 
         File file = new File(jarFile);
         try {
-            FunctionCommon.loadJar(file);
+            ClassLoaderUtils.loadJar(file);
         } catch (MalformedURLException e) {
             throw new RuntimeException("Failed to load user jar " + file, e);
         }
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index c162c4b..9c2ddcf 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -57,7 +57,7 @@ import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.functions.api.Context;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index f0418c9..c7c4b6d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -121,7 +121,7 @@ public class TestCmdSinks {
             throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
         }
         JAR_FILE_PATH = file.getFile();
-        Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index cbf121d..590031d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.UpdateOptions;
 import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PowerMockIgnore;
 import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -98,7 +98,7 @@ public class TestCmdSources {
         mockStatic(CmdFunctions.class);
         PowerMockito.doNothing().when(localSourceRunner).runCmd();
         JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
-        Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
+        Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
     }
 
     public SourceConfig getSourceConfig() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
new file mode 100644
index 0000000..9af547e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+
+/**
+ * Helper methods wrt Classloading.
+ */
+public class ClassLoaderUtils {
+    /**
+     * Load a jar.
+     *
+     * @param jar file of jar
+     * @return classloader
+     * @throws MalformedURLException
+     */
+    public static ClassLoader loadJar(File jar) throws MalformedURLException {
+        java.net.URL url = jar.toURI().toURL();
+        return new URLClassLoader(new URL[]{url});
+    }
+
+    public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
+        if (archivePath != null) {
+            return loadJar(archivePath.toFile());
+        }
+        if (packageFile != null) {
+            return loadJar(packageFile);
+        }
+        return null;
+    }
+
+    public static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
+        Class<?> objectClass;
+        try {
+            objectClass = Class.forName(className);
+        } catch (ClassNotFoundException | NoClassDefFoundError e) {
+            if (classLoader != null) {
+                objectClass = classLoader.loadClass(className);
+            } else {
+                throw e;
+            }
+        }
+        return objectClass;
+    }
+
+    public static void implementsClass(String className, Class<?> klass, ClassLoader classLoader) {
+        Class<?> objectClass;
+        try {
+            objectClass = loadClass(className, classLoader);
+        } catch (ClassNotFoundException | NoClassDefFoundError e) {
+            throw new IllegalArgumentException("Cannot find/load class " + className);
+        }
+
+        if (!klass.isAssignableFrom(objectClass)) {
+            throw new IllegalArgumentException(
+                    String.format("%s does not implement %s", className, klass.getName()));
+        }
+    }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
similarity index 94%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index e45b397..e1713ba 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.common.util;
 
 import java.io.IOException;
 import java.lang.reflect.Array;
@@ -165,7 +165,7 @@ public class Reflections {
 
     public static Object createInstance(String userClassName, java.io.File jar) {
         try {
-            return createInstance(userClassName, FunctionCommon.loadJar(jar));
+            return createInstance(userClassName, ClassLoaderUtils.loadJar(jar));
         } catch (Exception ex) {
             return null;
         }
@@ -180,7 +180,7 @@ public class Reflections {
      */
     public static boolean classExistsInJar(java.io.File jar, String fqcn) {
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
             Class.forName(fqcn, false, loader);
             loader.close();
             return true;
@@ -199,7 +199,7 @@ public class Reflections {
         try {
             Class.forName(fqcn);
             return true;
-        } catch( ClassNotFoundException e ) {
+        } catch (ClassNotFoundException e) {
            return false;
         }
     }
@@ -214,7 +214,7 @@ public class Reflections {
     public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
         boolean ret = false;
         try {
-            java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
+            java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
             if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
                 ret = true;
             }
@@ -281,7 +281,7 @@ public class Reflections {
                 throw new ClassNotFoundException(className);
             }
         } else if (isPrimitive(className)) {
-            return (Class)PRIMITIVE_NAME_TYPE_MAP.get(className);
+            return (Class) PRIMITIVE_NAME_TYPE_MAP.get(className);
         } else if (className.charAt(0) == 'L' && className.charAt(className.length() - 1) == ';') {
             return classLoader.loadClass(className.substring(1, className.length() - 1));
         } else {
@@ -291,10 +291,12 @@ public class Reflections {
                 if (className.charAt(0) != '[') {
                     throw var4;
                 } else {
+                    // CHECKSTYLE.OFF: EmptyStatement
                     int arrayDimension;
-                    for(arrayDimension = 0; className.charAt(arrayDimension) == '['; ++arrayDimension) {
+                    for (arrayDimension = 0; className.charAt(arrayDimension) == '['; ++arrayDimension) {
                         ;
                     }
+                    // CHECKSTYLE.ON: EmptyStatement
 
                     Class componentType = loadClass(className.substring(arrayDimension), classLoader);
                     return Array.newInstance(componentType, new int[arrayDimension]).getClass();
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
similarity index 98%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
index 1ad03b1..e59f21a 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
@@ -16,15 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.common.util;
 
-import static org.apache.pulsar.functions.utils.Reflections.createInstance;
+import static org.apache.pulsar.common.util.Reflections.createInstance;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 import static org.testng.AssertJUnit.fail;
 
 import java.lang.reflect.InvocationTargetException;
+
 import org.testng.annotations.Test;
 
 /**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index e73f1ce..d344364 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 892c7e5..7bb8238 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -27,7 +27,7 @@ import io.prometheus.client.CollectorRegistry;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
-import lombok.AccessLevel;
+
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
@@ -69,7 +69,7 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
 import org.apache.pulsar.functions.source.PulsarSource;
 import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.io.core.Sink;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8cb67ef..8aa1cbc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.functions.instance.SinkRecord;
 import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.io.core.Sink;
 import org.apache.pulsar.io.core.SinkContext;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fa7146d..f4a91fc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.annotations.VisibleForTesting;
 
 import java.util.*;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -35,7 +34,7 @@ import org.apache.pulsar.client.api.*;
 import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.io.core.PushSource;
 import org.apache.pulsar.io.core.SourceContext;
 
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index a04010f..83d5102 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
 import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.functions.api.*;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.functions.WindowConfig;
 import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
 import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
index b49ab3b..00a8ee9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.auth;
 import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import java.util.Optional;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index aa40a7b..dacd3a1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -19,11 +19,9 @@
 package org.apache.pulsar.functions.auth;
 
 import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1ServiceAccount;
 import io.kubernetes.client.models.V1StatefulSet;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
 import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import java.util.Optional;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index f7104df..a1b88a4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import javax.management.MalformedObjectNameException;
 import java.lang.reflect.Type;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
index 365cda8..ac617ab 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
@@ -18,7 +18,7 @@
  */
 package org.apache.pulsar.functions.runtime;
 
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import java.util.Map;
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 16c630d..51e342e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.worker.WorkerConfig;
 
 import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 423d1f0..ce03a27 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 import org.apache.pulsar.functions.worker.WorkerConfig;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 1bd99b2..b8ac299 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -31,7 +31,6 @@ import java.net.*;
 import java.nio.channels.Channels;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.file.Path;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.UUID;
@@ -42,6 +41,7 @@ import org.apache.pulsar.client.impl.TopicMessageIdImpl;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Utils;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.functions.api.Function;
 import org.apache.pulsar.functions.api.WindowFunction;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
@@ -205,16 +205,6 @@ public class FunctionCommon {
         return typeArg;
     }
 
-    public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
-        if (archivePath != null) {
-            return loadJar(archivePath.toFile());
-        }
-        if (packageFile != null) {
-            return loadJar(packageFile);
-        }
-        return null;
-    }
-
     public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
         URL website = new URL(destPkgUrl);
 
@@ -226,22 +216,10 @@ public class FunctionCommon {
         log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
     }
 
-    /**
-     * Load a jar.
-     *
-     * @param jar file of jar
-     * @return classloader
-     * @throws MalformedURLException
-     */
-    public static ClassLoader loadJar(File jar) throws MalformedURLException {
-        java.net.URL url = jar.toURI().toURL();
-        return new URLClassLoader(new URL[]{url});
-    }
-
     public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
         File file = extractFileFromPkgURL(destPkgUrl);
         try {
-            return loadJar(file);
+            return ClassLoaderUtils.loadJar(file);
         } catch (MalformedURLException e) {
             throw new IllegalArgumentException(
                     "Corrupt User PackageFile " + file + " with error " + e.getMessage());
@@ -270,34 +248,6 @@ public class FunctionCommon {
         }
     }
 
-    public static void implementsClass(String className, Class<?> klass, ClassLoader classLoader) {
-        Class<?> objectClass;
-        try {
-            objectClass = loadClass(className, classLoader);
-        } catch (ClassNotFoundException | NoClassDefFoundError e) {
-            throw new IllegalArgumentException("Cannot find/load class " + className);
-        }
-
-        if (!klass.isAssignableFrom(objectClass)) {
-            throw new IllegalArgumentException(
-                    String.format("%s does not implement %s", className, klass.getName()));
-        }
-    }
-
-    public static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
-        Class<?> objectClass;
-        try {
-            objectClass = Class.forName(className);
-        } catch (ClassNotFoundException | NoClassDefFoundError e) {
-            if (classLoader != null) {
-                objectClass = classLoader.loadClass(className);
-            } else {
-                throw e;
-            }
-        }
-        return objectClass;
-    }
-
     public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile,
                                                        String narExtractionDirectory) {
         if (archivePath != null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 9a94545..4e8f2d9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -44,7 +44,7 @@ import static org.apache.commons.lang.StringUtils.isNotBlank;
 import static org.apache.commons.lang.StringUtils.isNotEmpty;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
+import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
 
 @Slf4j
 public class FunctionConfigUtils {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 7bb9f77..ec027a4 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,7 +20,6 @@
 package org.apache.pulsar.functions.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
@@ -34,9 +33,9 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.validator.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -357,7 +356,7 @@ public class SinkConfigUtils {
         Exception narClassLoaderException = null;
 
         try {
-            jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile);
+            jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath, sinkPackageFile);
         } catch (Exception e) {
             jarClassLoaderException = e;
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 38939a6..77b8f8a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,7 +20,6 @@
 package org.apache.pulsar.functions.utils;
 
 import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.Gson;
 import com.google.gson.reflect.TypeToken;
 import lombok.AllArgsConstructor;
@@ -33,6 +32,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
 import org.apache.pulsar.common.validator.ConfigValidation;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -249,7 +249,7 @@ public class SourceConfigUtils {
         Exception narClassLoaderException = null;
 
         try {
-            jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
+            jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath, sourcePackageFile);
         } catch (Exception e) {
             jarClassLoaderException = e;
         }
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 1dd3c8b..7eeeda0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -23,6 +23,8 @@ import lombok.extern.slf4j.Slf4j;
 import net.jodah.typetools.TypeResolver;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.SerDe;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.io.core.Sink;
@@ -31,7 +33,7 @@ import org.apache.pulsar.io.core.Source;
 import static org.apache.commons.lang3.StringUtils.isBlank;
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.Reflections.createInstance;
+import static org.apache.pulsar.common.util.Reflections.createInstance;
 
 @Slf4j
 public class ValidatorUtils {
@@ -43,7 +45,7 @@ public class ValidatorUtils {
             // If it's empty, we use the default schema and no need to validate
             // If it's built-in, no need to validate
         } else {
-            FunctionCommon.implementsClass(schemaType, Schema.class, clsLoader);
+            ClassLoaderUtils.implementsClass(schemaType, Schema.class, clsLoader);
             validateSchemaType(schemaType, typeArg, clsLoader, input);
         }
     }
@@ -62,13 +64,13 @@ public class ValidatorUtils {
         if (isEmpty(inputSerializer)) return;
         if (inputSerializer.equals(DEFAULT_SERDE)) return;
         try {
-            Class<?> serdeClass = FunctionCommon.loadClass(inputSerializer, clsLoader);
+            Class<?> serdeClass = ClassLoaderUtils.loadClass(inputSerializer, clsLoader);
         } catch (ClassNotFoundException | NoClassDefFoundError e) {
             throw new IllegalArgumentException(
                     String.format("The input serialization/deserialization class %s does not exist",
                             inputSerializer));
         }
-        FunctionCommon.implementsClass(inputSerializer, SerDe.class, clsLoader);
+        ClassLoaderUtils.implementsClass(inputSerializer, SerDe.class, clsLoader);
 
         SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
         if (serDe == null) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 3d7d106..280db6c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
 import org.testng.annotations.Test;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 8e9df03..4363ea1 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
 import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 4c80b23..c9f6950 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -25,8 +25,8 @@ import lombok.experimental.Accessors;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
 import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
 import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 3f522dd..5ac73d9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsPro
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionInstanceId;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 789c91a..bcbccda 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
 import org.apache.pulsar.functions.proto.Function.Instance;
 import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.worker.scheduler.IScheduler;
 
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index bc117ab..a93077c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -87,7 +88,7 @@ import static org.testng.Assert.assertEquals;
 /**
  * Unit test of {@link SinksApiV3Resource}.
  */
-@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
+@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, ClassLoaderUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" })
 
 public class SinkApiV3ResourceTest {
@@ -810,6 +811,8 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
 
@@ -881,6 +884,8 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
 
@@ -980,6 +985,8 @@ public class SinkApiV3ResourceTest {
         doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         doReturn(String.class).when(FunctionCommon.class);
         FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index d13fd3d..d36c9a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.Tenants;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
 import org.apache.pulsar.common.util.FutureUtil;
 import org.apache.pulsar.common.util.RestException;
 import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -88,7 +89,7 @@ import org.testng.annotations.Test;
 /**
  * Unit test of {@link SourcesApiV3Resource}.
  */
-@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
+@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, ClassLoaderUtils.class, InstanceUtils.class})
 @PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
 public class SourceApiV3ResourceTest {
 
@@ -831,6 +832,8 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
         doReturn(String.class).when(FunctionCommon.class);
@@ -901,6 +904,8 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
         doReturn(String.class).when(FunctionCommon.class);
@@ -995,6 +1000,8 @@ public class SourceApiV3ResourceTest {
         doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
         ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
 
+        mockStatic(ClassLoaderUtils.class);
+
         mockStatic(FunctionCommon.class);
         doReturn(String.class).when(FunctionCommon.class);
         FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));