You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/05/30 05:24:46 UTC
[pulsar] branch master updated: Moved ClassLoading and Reflections
Helper functions to common (#7103)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e451be3 Moved ClassLoading and Reflections Helper functions to common (#7103)
e451be3 is described below
commit e451be300e5a0e718c444dd3a5b3a80a16734974
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Fri May 29 22:24:29 2020 -0700
Moved ClassLoading and Reflections Helper functions to common (#7103)
* Moved ClassLoading and Reflections Helper functions to common
* Fix tests
* Fix test
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 4 +-
.../apache/pulsar/admin/cli/CmdFunctionsTest.java | 2 +-
.../org/apache/pulsar/admin/cli/TestCmdSinks.java | 4 +-
.../apache/pulsar/admin/cli/TestCmdSources.java | 4 +-
.../pulsar/common/util/ClassLoaderUtils.java | 80 ++++++++++++++++++++++
.../apache/pulsar/common/util}/Reflections.java | 16 +++--
.../pulsar/common/util}/ReflectionsTest.java | 5 +-
.../pulsar/functions/instance/InstanceUtils.java | 2 +-
.../functions/instance/JavaInstanceRunnable.java | 4 +-
.../apache/pulsar/functions/sink/PulsarSink.java | 2 +-
.../pulsar/functions/source/PulsarSource.java | 3 +-
.../windowing/WindowFunctionExecutor.java | 2 +-
.../functions/auth/FunctionAuthProvider.java | 2 +-
.../auth/KubernetesFunctionAuthProvider.java | 4 +-
.../functions/runtime/JavaInstanceStarter.java | 2 +-
.../functions/runtime/RuntimeCustomizer.java | 2 +-
.../pulsar/functions/runtime/RuntimeFactory.java | 2 +-
.../runtime/thread/ThreadRuntimeFactory.java | 2 +-
.../pulsar/functions/utils/FunctionCommon.java | 54 +--------------
.../functions/utils/FunctionConfigUtils.java | 2 +-
.../pulsar/functions/utils/SinkConfigUtils.java | 5 +-
.../pulsar/functions/utils/SourceConfigUtils.java | 4 +-
.../pulsar/functions/utils/ValidatorUtils.java | 10 +--
.../functions/utils/FunctionConfigUtilsTest.java | 1 +
.../functions/utils/SinkConfigUtilsTest.java | 1 +
.../functions/utils/SourceConfigUtilsTest.java | 2 +-
.../functions/worker/FunctionRuntimeManager.java | 2 +-
.../pulsar/functions/worker/SchedulerManager.java | 2 +-
.../worker/rest/api/v3/SinkApiV3ResourceTest.java | 9 ++-
.../rest/api/v3/SourceApiV3ResourceTest.java | 9 ++-
30 files changed, 145 insertions(+), 98 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index ed33614..e241f76 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -52,11 +52,11 @@ import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -238,7 +238,7 @@ public class PulsarFunctionTlsTest {
File file = new File(jarFile);
try {
- FunctionCommon.loadJar(file);
+ ClassLoaderUtils.loadJar(file);
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to load user jar " + file, e);
}
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
index c162c4b..9c2ddcf 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/CmdFunctionsTest.java
@@ -57,7 +57,7 @@ import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.functions.api.Context;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
index f0418c9..c7c4b6d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSinks.java
@@ -44,7 +44,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -121,7 +121,7 @@ public class TestCmdSinks {
throw new RuntimeException("Failed to file required test archive: " + JAR_FILE_NAME);
}
JAR_FILE_PATH = file.getFile();
- Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
}
public SinkConfig getSinkConfig() {
diff --git a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
index cbf121d..590031d 100644
--- a/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
+++ b/pulsar-client-tools/src/test/java/org/apache/pulsar/admin/cli/TestCmdSources.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.UpdateOptions;
import org.apache.pulsar.common.io.SourceConfig;
-import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
@@ -98,7 +98,7 @@ public class TestCmdSources {
mockStatic(CmdFunctions.class);
PowerMockito.doNothing().when(localSourceRunner).runCmd();
JAR_FILE_PATH = Thread.currentThread().getContextClassLoader().getResource(JAR_FILE_NAME).getFile();
- Thread.currentThread().setContextClassLoader(FunctionCommon.loadJar(new File(JAR_FILE_PATH)));
+ Thread.currentThread().setContextClassLoader(ClassLoaderUtils.loadJar(new File(JAR_FILE_PATH)));
}
public SourceConfig getSourceConfig() {
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
new file mode 100644
index 0000000..9af547e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/ClassLoaderUtils.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util;
+
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Path;
+
+/**
+ * Helper methods wrt Classloading.
+ */
+public class ClassLoaderUtils {
+ /**
+ * Load a jar.
+ *
+ * @param jar file of jar
+ * @return classloader
+ * @throws MalformedURLException
+ */
+ public static ClassLoader loadJar(File jar) throws MalformedURLException {
+ java.net.URL url = jar.toURI().toURL();
+ return new URLClassLoader(new URL[]{url});
+ }
+
+ public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
+ if (archivePath != null) {
+ return loadJar(archivePath.toFile());
+ }
+ if (packageFile != null) {
+ return loadJar(packageFile);
+ }
+ return null;
+ }
+
+ public static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
+ Class<?> objectClass;
+ try {
+ objectClass = Class.forName(className);
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ if (classLoader != null) {
+ objectClass = classLoader.loadClass(className);
+ } else {
+ throw e;
+ }
+ }
+ return objectClass;
+ }
+
+ public static void implementsClass(String className, Class<?> klass, ClassLoader classLoader) {
+ Class<?> objectClass;
+ try {
+ objectClass = loadClass(className, classLoader);
+ } catch (ClassNotFoundException | NoClassDefFoundError e) {
+ throw new IllegalArgumentException("Cannot find/load class " + className);
+ }
+
+ if (!klass.isAssignableFrom(objectClass)) {
+ throw new IllegalArgumentException(
+ String.format("%s does not implement %s", className, klass.getName()));
+ }
+ }
+}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
similarity index 94%
rename from pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
rename to pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
index e45b397..e1713ba 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/Reflections.java
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.common.util;
import java.io.IOException;
import java.lang.reflect.Array;
@@ -165,7 +165,7 @@ public class Reflections {
public static Object createInstance(String userClassName, java.io.File jar) {
try {
- return createInstance(userClassName, FunctionCommon.loadJar(jar));
+ return createInstance(userClassName, ClassLoaderUtils.loadJar(jar));
} catch (Exception ex) {
return null;
}
@@ -180,7 +180,7 @@ public class Reflections {
*/
public static boolean classExistsInJar(java.io.File jar, String fqcn) {
try {
- java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
+ java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
Class.forName(fqcn, false, loader);
loader.close();
return true;
@@ -199,7 +199,7 @@ public class Reflections {
try {
Class.forName(fqcn);
return true;
- } catch( ClassNotFoundException e ) {
+ } catch (ClassNotFoundException e) {
return false;
}
}
@@ -214,7 +214,7 @@ public class Reflections {
public static boolean classInJarImplementsIface(java.io.File jar, String fqcn, Class xface) {
boolean ret = false;
try {
- java.net.URLClassLoader loader = (URLClassLoader) FunctionCommon.loadJar(jar);
+ java.net.URLClassLoader loader = (URLClassLoader) ClassLoaderUtils.loadJar(jar);
if (xface.isAssignableFrom(Class.forName(fqcn, false, loader))){
ret = true;
}
@@ -281,7 +281,7 @@ public class Reflections {
throw new ClassNotFoundException(className);
}
} else if (isPrimitive(className)) {
- return (Class)PRIMITIVE_NAME_TYPE_MAP.get(className);
+ return (Class) PRIMITIVE_NAME_TYPE_MAP.get(className);
} else if (className.charAt(0) == 'L' && className.charAt(className.length() - 1) == ';') {
return classLoader.loadClass(className.substring(1, className.length() - 1));
} else {
@@ -291,10 +291,12 @@ public class Reflections {
if (className.charAt(0) != '[') {
throw var4;
} else {
+ // CHECKSTYLE.OFF: EmptyStatement
int arrayDimension;
- for(arrayDimension = 0; className.charAt(arrayDimension) == '['; ++arrayDimension) {
+ for (arrayDimension = 0; className.charAt(arrayDimension) == '['; ++arrayDimension) {
;
}
+ // CHECKSTYLE.ON: EmptyStatement
Class componentType = loadClass(className.substring(arrayDimension), classLoader);
return Array.newInstance(componentType, new int[arrayDimension]).getClass();
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
similarity index 98%
rename from pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
rename to pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
index 1ad03b1..e59f21a 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/util/ReflectionsTest.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.functions.utils;
+package org.apache.pulsar.common.util;
-import static org.apache.pulsar.functions.utils.Reflections.createInstance;
+import static org.apache.pulsar.common.util.Reflections.createInstance;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
import static org.testng.AssertJUnit.fail;
import java.lang.reflect.InvocationTargetException;
+
import org.testng.annotations.Test;
/**
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
index e73f1ce..d344364 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/InstanceUtils.java
@@ -28,7 +28,7 @@ import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.sink.PulsarSink;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index 892c7e5..7bb8238 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -27,7 +27,7 @@ import io.prometheus.client.CollectorRegistry;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-import lombok.AccessLevel;
+
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
@@ -69,7 +69,7 @@ import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
import org.apache.pulsar.functions.source.PulsarSource;
import org.apache.pulsar.functions.source.PulsarSourceConfig;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.io.core.Sink;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 8cb67ef..8aa1cbc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -40,7 +40,7 @@ import org.apache.pulsar.functions.instance.SinkRecord;
import org.apache.pulsar.functions.instance.stats.ComponentStatsManager;
import org.apache.pulsar.functions.source.PulsarRecord;
import org.apache.pulsar.functions.source.TopicSchema;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.io.core.Sink;
import org.apache.pulsar.io.core.SinkContext;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index fa7146d..f4a91fc 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -23,7 +23,6 @@ import static com.google.common.base.Preconditions.checkArgument;
import com.google.common.annotations.VisibleForTesting;
import java.util.*;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -35,7 +34,7 @@ import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.common.functions.FunctionConfig;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.io.core.PushSource;
import org.apache.pulsar.io.core.SourceContext;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
index a04010f..83d5102 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowFunctionExecutor.java
@@ -31,7 +31,7 @@ import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.api.*;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.functions.WindowConfig;
import org.apache.pulsar.functions.windowing.evictors.CountEvictionPolicy;
import org.apache.pulsar.functions.windowing.evictors.TimeEvictionPolicy;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
index b49ab3b..00a8ee9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/FunctionAuthProvider.java
@@ -21,7 +21,7 @@ package org.apache.pulsar.functions.auth;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
index aa40a7b..dacd3a1 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/auth/KubernetesFunctionAuthProvider.java
@@ -19,11 +19,9 @@
package org.apache.pulsar.functions.auth;
import io.kubernetes.client.apis.CoreV1Api;
-import io.kubernetes.client.models.V1ServiceAccount;
import io.kubernetes.client.models.V1StatefulSet;
-import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index f7104df..a1b88a4 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -51,7 +51,7 @@ import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import javax.management.MalformedObjectNameException;
import java.lang.reflect.Type;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
index 365cda8..ac617ab 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeCustomizer.java
@@ -18,7 +18,7 @@
*/
package org.apache.pulsar.functions.runtime;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import java.util.Map;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 16c630d..51e342e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -24,7 +24,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.worker.WorkerConfig;
import java.util.Optional;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 423d1f0..ce03a27 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -36,7 +36,7 @@ import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
index 1bd99b2..b8ac299 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionCommon.java
@@ -31,7 +31,6 @@ import java.net.*;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.file.Path;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
@@ -42,6 +41,7 @@ import org.apache.pulsar.client.impl.TopicMessageIdImpl;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.functions.api.Function;
import org.apache.pulsar.functions.api.WindowFunction;
import org.apache.pulsar.functions.proto.Function.FunctionDetails.Runtime;
@@ -205,16 +205,6 @@ public class FunctionCommon {
return typeArg;
}
- public static ClassLoader extractClassLoader(Path archivePath, File packageFile) throws Exception {
- if (archivePath != null) {
- return loadJar(archivePath.toFile());
- }
- if (packageFile != null) {
- return loadJar(packageFile);
- }
- return null;
- }
-
public static void downloadFromHttpUrl(String destPkgUrl, File targetFile) throws IOException {
URL website = new URL(destPkgUrl);
@@ -226,22 +216,10 @@ public class FunctionCommon {
log.info("Downloading function package from {} to {} completed!", destPkgUrl, targetFile.getAbsoluteFile());
}
- /**
- * Load a jar.
- *
- * @param jar file of jar
- * @return classloader
- * @throws MalformedURLException
- */
- public static ClassLoader loadJar(File jar) throws MalformedURLException {
- java.net.URL url = jar.toURI().toURL();
- return new URLClassLoader(new URL[]{url});
- }
-
public static ClassLoader extractClassLoader(String destPkgUrl) throws IOException, URISyntaxException {
File file = extractFileFromPkgURL(destPkgUrl);
try {
- return loadJar(file);
+ return ClassLoaderUtils.loadJar(file);
} catch (MalformedURLException e) {
throw new IllegalArgumentException(
"Corrupt User PackageFile " + file + " with error " + e.getMessage());
@@ -270,34 +248,6 @@ public class FunctionCommon {
}
}
- public static void implementsClass(String className, Class<?> klass, ClassLoader classLoader) {
- Class<?> objectClass;
- try {
- objectClass = loadClass(className, classLoader);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- throw new IllegalArgumentException("Cannot find/load class " + className);
- }
-
- if (!klass.isAssignableFrom(objectClass)) {
- throw new IllegalArgumentException(
- String.format("%s does not implement %s", className, klass.getName()));
- }
- }
-
- public static Class<?> loadClass(String className, ClassLoader classLoader) throws ClassNotFoundException {
- Class<?> objectClass;
- try {
- objectClass = Class.forName(className);
- } catch (ClassNotFoundException | NoClassDefFoundError e) {
- if (classLoader != null) {
- objectClass = classLoader.loadClass(className);
- } else {
- throw e;
- }
- }
- return objectClass;
- }
-
public static NarClassLoader extractNarClassLoader(Path archivePath, File packageFile,
String narExtractionDirectory) {
if (archivePath != null) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
index 9a94545..4e8f2d9 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/FunctionConfigUtils.java
@@ -44,7 +44,7 @@ import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.apache.commons.lang.StringUtils.isNotEmpty;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.pulsar.common.functions.Utils.BUILTIN;
-import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
+import static org.apache.pulsar.common.util.ClassLoaderUtils.loadJar;
@Slf4j
public class FunctionConfigUtils {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 7bb9f77..ec027a4 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -20,7 +20,6 @@
package org.apache.pulsar.functions.utils;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
@@ -34,9 +33,9 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
-import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -357,7 +356,7 @@ public class SinkConfigUtils {
Exception narClassLoaderException = null;
try {
- jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sinkPackageFile);
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath, sinkPackageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
index 38939a6..77b8f8a 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SourceConfigUtils.java
@@ -20,7 +20,6 @@
package org.apache.pulsar.functions.utils;
import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import lombok.AllArgsConstructor;
@@ -33,6 +32,7 @@ import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.nar.NarClassLoader;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.common.validator.ConfigValidation;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -249,7 +249,7 @@ public class SourceConfigUtils {
Exception narClassLoaderException = null;
try {
- jarClassLoader = FunctionCommon.extractClassLoader(archivePath, sourcePackageFile);
+ jarClassLoader = ClassLoaderUtils.extractClassLoader(archivePath, sourcePackageFile);
} catch (Exception e) {
jarClassLoaderException = e;
}
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
index 1dd3c8b..7eeeda0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/ValidatorUtils.java
@@ -23,6 +23,8 @@ import lombok.extern.slf4j.Slf4j;
import net.jodah.typetools.TypeResolver;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.schema.SchemaType;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.SerDe;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.io.core.Sink;
@@ -31,7 +33,7 @@ import org.apache.pulsar.io.core.Source;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
-import static org.apache.pulsar.functions.utils.Reflections.createInstance;
+import static org.apache.pulsar.common.util.Reflections.createInstance;
@Slf4j
public class ValidatorUtils {
@@ -43,7 +45,7 @@ public class ValidatorUtils {
// If it's empty, we use the default schema and no need to validate
// If it's built-in, no need to validate
} else {
- FunctionCommon.implementsClass(schemaType, Schema.class, clsLoader);
+ ClassLoaderUtils.implementsClass(schemaType, Schema.class, clsLoader);
validateSchemaType(schemaType, typeArg, clsLoader, input);
}
}
@@ -62,13 +64,13 @@ public class ValidatorUtils {
if (isEmpty(inputSerializer)) return;
if (inputSerializer.equals(DEFAULT_SERDE)) return;
try {
- Class<?> serdeClass = FunctionCommon.loadClass(inputSerializer, clsLoader);
+ Class<?> serdeClass = ClassLoaderUtils.loadClass(inputSerializer, clsLoader);
} catch (ClassNotFoundException | NoClassDefFoundError e) {
throw new IllegalArgumentException(
String.format("The input serialization/deserialization class %s does not exist",
inputSerializer));
}
- FunctionCommon.implementsClass(inputSerializer, SerDe.class, clsLoader);
+ ClassLoaderUtils.implementsClass(inputSerializer, SerDe.class, clsLoader);
SerDe serDe = (SerDe) Reflections.createInstance(inputSerializer, clsLoader);
if (serDe == null) {
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
index 3d7d106..280db6c 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/FunctionConfigUtilsTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.functions.WindowConfig;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
import org.testng.annotations.Test;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 8e9df03..4363ea1 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -27,6 +27,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
import org.apache.pulsar.common.io.SinkConfig;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
index 4c80b23..c9f6950 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SourceConfigUtilsTest.java
@@ -25,8 +25,8 @@ import lombok.experimental.Accessors;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
import org.apache.pulsar.common.io.ConnectorDefinition;
-import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.common.validator.ConfigValidationAnnotations;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
import org.apache.pulsar.functions.proto.Function;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 3f522dd..5ac73d9 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -46,7 +46,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsPro
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionInstanceId;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.MediaType;
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
index 789c91a..bcbccda 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/SchedulerManager.java
@@ -45,7 +45,7 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.Function.FunctionMetaData;
import org.apache.pulsar.functions.proto.Function.Instance;
import org.apache.pulsar.functions.utils.Actions;
-import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
index bc117ab..a93077c 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SinkApiV3ResourceTest.java
@@ -30,6 +30,7 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -87,7 +88,7 @@ import static org.testng.Assert.assertEquals;
/**
* Unit test of {@link SinksApiV3Resource}.
*/
-@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
+@PrepareForTest({WorkerUtils.class, SinkConfigUtils.class, ConnectorUtils.class, FunctionCommon.class, ClassLoaderUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*", "java.io.*" })
public class SinkApiV3ResourceTest {
@@ -810,6 +811,8 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
@@ -881,6 +884,8 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
@@ -980,6 +985,8 @@ public class SinkApiV3ResourceTest {
doReturn(CassandraStringSink.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSinkClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSinkType(anyString(), any(NarClassLoader.class));
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
index d13fd3d..d36c9a9 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/rest/api/v3/SourceApiV3ResourceTest.java
@@ -53,6 +53,7 @@ import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ClassLoaderUtils;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.RestException;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
@@ -88,7 +89,7 @@ import org.testng.annotations.Test;
/**
* Unit test of {@link SourcesApiV3Resource}.
*/
-@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, InstanceUtils.class})
+@PrepareForTest({WorkerUtils.class, ConnectorUtils.class, FunctionCommon.class, ClassLoaderUtils.class, InstanceUtils.class})
@PowerMockIgnore({ "javax.management.*", "javax.ws.*", "org.apache.logging.log4j.*", "org.apache.pulsar.io.*" })
public class SourceApiV3ResourceTest {
@@ -831,6 +832,8 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
doReturn(String.class).when(FunctionCommon.class);
@@ -901,6 +904,8 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
PowerMockito.when(FunctionCommon.class, "createPkgTempFile").thenCallRealMethod();
doReturn(String.class).when(FunctionCommon.class);
@@ -995,6 +1000,8 @@ public class SourceApiV3ResourceTest {
doReturn(TwitterFireHose.class.getName()).when(ConnectorUtils.class);
ConnectorUtils.getIOSourceClass(any(NarClassLoader.class));
+ mockStatic(ClassLoaderUtils.class);
+
mockStatic(FunctionCommon.class);
doReturn(String.class).when(FunctionCommon.class);
FunctionCommon.getSourceType(anyString(), any(NarClassLoader.class));