You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2019/07/13 04:58:51 UTC
[pulsar] branch master updated: Use classloaders to load Java
functions (#4685)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 6ff1bba Use classloaders to load Java functions (#4685)
6ff1bba is described below
commit 6ff1bbae0fe230eb72d62574d7685a745f884416
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Jul 12 21:58:44 2019 -0700
Use classloaders to load Java functions (#4685)
* Use classloading to load use code for functions
---
bin/pulsar | 2 +
distribution/server/pom.xml | 6 -
distribution/server/src/assemble/bin.xml | 2 +
.../pulsar/client/internal/ReflectionUtils.java | 10 +-
.../functions/instance/JavaInstanceRunnable.java | 102 ++++--
.../apache/pulsar/functions/sink/PulsarSink.java | 9 +-
.../pulsar/functions/source/PulsarSource.java | 7 +-
.../pulsar/functions/sink/PulsarSinkTest.java | 18 +-
.../pulsar/functions/source/PulsarSourceTest.java | 8 +-
.../org/apache/pulsar/functions/LocalRunner.java | 4 +-
pulsar-functions/runtime-all/pom.xml | 370 ++-------------------
.../functions/instance/JavaInstanceMain.java | 150 +++++++++
.../src/main/resources/java_instance_log4j2.xml | 130 ++++++++
.../main/resources/kubernetes_instance_log4j2.xml | 60 ++++
...aInstanceMain.java => JavaInstanceStarter.java} | 79 ++---
.../functions/runtime/KubernetesRuntime.java | 2 +-
.../pulsar/functions/runtime/ProcessRuntime.java | 2 +-
.../pulsar/functions/runtime/RuntimeUtils.java | 19 +-
.../functions/runtime/ThreadRuntimeFactory.java | 34 +-
.../src/main/resources/java_instance_log4j2.yml | 111 -------
.../main/resources/kubernetes_instance_log4j2.yml | 53 ---
.../functions/runtime/KubernetesRuntimeTest.java | 13 +-
.../functions/runtime/ProcessRuntimeTest.java | 13 +-
.../utils/functioncache/FunctionCacheEntry.java | 18 +-
.../functioncache/FunctionCacheManagerImpl.java | 9 +-
.../functions/worker/FunctionRuntimeManager.java | 2 +-
.../functions/worker/SchedulerManagerTest.java | 18 +-
27 files changed, 605 insertions(+), 646 deletions(-)
diff --git a/bin/pulsar b/bin/pulsar
index ad7736a..fd90aa1 100755
--- a/bin/pulsar
+++ b/bin/pulsar
@@ -283,6 +283,8 @@ OPTS="$OPTS -Dpulsar.functions.process.container.log.dir=$PULSAR_LOG_DIR"
OPTS="$OPTS -Dpulsar.functions.java.instance.jar=${JAVA_INSTANCE_JAR}"
OPTS="$OPTS -Dpulsar.functions.python.instance.file=${PY_INSTANCE_FILE}"
OPTS="$OPTS -Dpulsar.functions.extra.dependencies.dir=${FUNCTIONS_EXTRA_DEPS_DIR}"
+OPTS="$OPTS -Dpulsar.functions.instance.classpath=${PULSAR_CLASSPATH}"
+
ZK_OPTS=" -Dzookeeper.4lw.commands.whitelist=*"
diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index d233ea8..6fd5e5d 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -156,12 +156,6 @@
<version>${project.version}</version>
<!-- make sure the api examples are compiled before assembly -->
<scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>io.grpc</groupId>
- <artifactId>*</artifactId>
- </exclusion>
- </exclusions>
</dependency>
<!-- local-runner -->
diff --git a/distribution/server/src/assemble/bin.xml b/distribution/server/src/assemble/bin.xml
index c2d5996..bb3e273 100644
--- a/distribution/server/src/assemble/bin.xml
+++ b/distribution/server/src/assemble/bin.xml
@@ -131,6 +131,8 @@
<exclude>io.netty:netty-transport-native-epoll</exclude>
<exclude>io.netty:netty-transport-native-unix-common</exclude>
+ <exclude>org.apache.pulsar:pulsar-functions-runtime-all</exclude>
+
<!-- Already included in pulsar-zookeeper instrumented jar -->
<exclude>org.apache.zookeeper:zookeeper</exclude>
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
index 4d9db6e..101c77d 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/ReflectionUtils.java
@@ -50,7 +50,15 @@ class ReflectionUtils {
@SuppressWarnings("unchecked")
static <T> Class<T> newClassInstance(String className) {
try {
- return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
+ try {
+ // when the API is loaded in the same classloader as the impl
+ return (Class<T>) DefaultImplementation.class.getClassLoader().loadClass(className);
+ } catch (Exception e) {
+ // when the API is loaded in a separate classloader as the impl
+ // the classloader that loaded the impl needs to be a child classloader of the classloader
+ // that loaded the API
+ return (Class<T>) Thread.currentThread().getContextClassLoader().loadClass(className);
+ }
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index b6c4fc8..66d4ba7 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -90,8 +90,6 @@ import static org.apache.bookkeeper.stream.protocol.ProtocolConstants.DEFAULT_ST
@Slf4j
public class JavaInstanceRunnable implements AutoCloseable, Runnable {
- // The class loader that used for loading functions
- private ClassLoader fnClassLoader;
private final InstanceConfig instanceConfig;
private final FunctionCacheManager fnCache;
private final String jarFile;
@@ -132,6 +130,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private final Map<String, String> properties;
+ private final ClassLoader instanceClassLoader;
+ private ClassLoader functionClassLoader;
+
public JavaInstanceRunnable(InstanceConfig instanceConfig,
FunctionCacheManager fnCache,
String jarFile,
@@ -166,12 +167,14 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
// metrics collection especially in threaded mode
// In process mode the JavaInstanceMain will declare a CollectorRegistry and pass it down
this.collectorRegistry = collectorRegistry;
+
+ this.instanceClassLoader = Thread.currentThread().getContextClassLoader();
}
/**
* NOTE: this method should be called in the instance thread, in order to make class loading work.
*/
- JavaInstance setupJavaInstance(ContextImpl contextImpl) throws Exception {
+ JavaInstance setupJavaInstance() throws Exception {
// initialize the thread context
ThreadContext.put("function", FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails()));
ThreadContext.put("functionname", instanceConfig.getFunctionDetails().getName());
@@ -181,18 +184,21 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
instanceConfig.getFunctionDetails().getName(), instanceConfig.getFunctionDetails());
// start the function thread
- loadJars();
+ functionClassLoader = loadJars();
- ClassLoader clsLoader = Thread.currentThread().getContextClassLoader();
Object object = Reflections.createInstance(
instanceConfig.getFunctionDetails().getClassName(),
- clsLoader);
+ functionClassLoader);
+
if (!(object instanceof Function) && !(object instanceof java.util.function.Function)) {
throw new RuntimeException("User class must either be Function or java.util.Function");
}
// start the state table
setupStateTable();
+
+ ContextImpl contextImpl = setupContext();
+
// start the output producer
setupOutput(contextImpl);
// start the input consumer
@@ -225,8 +231,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
this.instanceCache.getScheduledExecutorService(),
this.componentType);
- ContextImpl contextImpl = setupContext();
- javaInstance = setupJavaInstance(contextImpl);
+ javaInstance = setupJavaInstance();
if (null != stateTable) {
StateContextImpl stateContext = new StateContextImpl(stateTable);
javaInstance.getContext().setStateContext(stateContext);
@@ -254,7 +259,9 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
stats.processTimeStart();
// process the message
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
result = javaInstance.handleMessage(currentRecord, currentRecord.getValue());
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
// register end time
stats.processTimeEnd();
@@ -289,7 +296,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
}
- private void loadJars() throws Exception {
+ private ClassLoader loadJars() throws Exception {
+ ClassLoader fnClassLoader;
try {
log.info("Load JAR: {}", jarFile);
// Let's first try to treat it as a nar archive
@@ -309,13 +317,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
log.info("Initialize function class loader for function {} at function cache manager",
instanceConfig.getFunctionDetails().getName());
- this.fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
+ fnClassLoader = fnCache.getClassLoader(instanceConfig.getFunctionId());
if (null == fnClassLoader) {
throw new Exception("No function class loader available.");
}
- // make sure the function class loader is accessible thread-locally
- Thread.currentThread().setContextClassLoader(fnClassLoader);
+ return fnClassLoader;
}
private void createStateTable(String tableNs, String tableName, StorageClientSettings settings) throws Exception {
@@ -425,23 +432,33 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
private void sendOutputMessage(Record srcRecord, Object output) {
+ if (!(this.sink instanceof PulsarSink)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
this.sink.write(new SinkRecord<>(srcRecord, output));
} catch (Exception e) {
log.info("Encountered exception in sink write: ", e);
stats.incrSinkExceptions(e);
throw new RuntimeException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
}
private Record readInput() {
Record record;
+ if (!(this.source instanceof PulsarSource)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
record = this.source.read();
} catch (Exception e) {
stats.incrSourceExceptions(e);
log.info("Encountered exception in source read: ", e);
throw new RuntimeException(e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
// check record is valid
@@ -466,19 +483,29 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
if (source != null) {
+ if (!(this.source instanceof PulsarSource)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
source.close();
} catch (Throwable e) {
log.error("Failed to close source {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
source = null;
}
if (sink != null) {
+ if (!(this.sink instanceof PulsarSink)) {
+ Thread.currentThread().setContextClassLoader(functionClassLoader);
+ }
try {
sink.close();
} catch (Throwable e) {
log.error("Failed to close sink {}", instanceConfig.getFunctionDetails().getSource().getClassName(), e);
+ } finally {
+ Thread.currentThread().setContextClassLoader(instanceClassLoader);
}
sink = null;
}
@@ -667,11 +694,11 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSourceConfig.setMaxMessageRetries(this.instanceConfig.getFunctionDetails().getRetryDetails().getMaxMessageRetries());
pulsarSourceConfig.setDeadLetterTopic(this.instanceConfig.getFunctionDetails().getRetryDetails().getDeadLetterTopic());
}
- object = new PulsarSource(this.client, pulsarSourceConfig, this.properties);
+ object = new PulsarSource(this.client, pulsarSourceConfig, this.properties, this.functionClassLoader);
} else {
object = Reflections.createInstance(
sourceSpec.getClassName(),
- Thread.currentThread().getContextClassLoader());
+ this.functionClassLoader);
}
Class<?>[] typeArgs;
@@ -683,11 +710,22 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
this.source = (Source<?>) object;
- if (sourceSpec.getConfigs().isEmpty()) {
- this.source.open(new HashMap<>(), contextImpl);
- } else {
- this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
- new TypeToken<Map<String, Object>>(){}.getType()), contextImpl);
+ if (!(this.source instanceof PulsarSource)) {
+ Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+ }
+ try {
+ if (sourceSpec.getConfigs().isEmpty()) {
+ this.source.open(new HashMap<>(), contextImpl);
+ } else {
+ this.source.open(new Gson().fromJson(sourceSpec.getConfigs(),
+ new TypeToken<Map<String, Object>>() {
+ }.getType()), contextImpl);
+ }
+ } catch (Exception e) {
+ log.error("Source open produced uncaught exception: ", e);
+ throw e;
+ } finally {
+ Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
@@ -713,12 +751,12 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
pulsarSinkConfig.setTypeClassName(sinkSpec.getTypeClassName());
- object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats);
+ object = new PulsarSink(this.client, pulsarSinkConfig, this.properties, this.stats, this.functionClassLoader);
}
} else {
object = Reflections.createInstance(
sinkSpec.getClassName(),
- Thread.currentThread().getContextClassLoader());
+ this.functionClassLoader);
}
if (object instanceof Sink) {
@@ -726,11 +764,23 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
} else {
throw new RuntimeException("Sink does not implement correct interface");
}
- if (sinkSpec.getConfigs().isEmpty()) {
- this.sink.open(new HashMap<>(), contextImpl);
- } else {
- this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
- new TypeToken<Map<String, Object>>() {}.getType()), contextImpl);
+
+ if (!(this.sink instanceof PulsarSink)) {
+ Thread.currentThread().setContextClassLoader(this.functionClassLoader);
+ }
+ try {
+ if (sinkSpec.getConfigs().isEmpty()) {
+ this.sink.open(new HashMap<>(), contextImpl);
+ } else {
+ this.sink.open(new Gson().fromJson(sinkSpec.getConfigs(),
+ new TypeToken<Map<String, Object>>() {
+ }.getType()), contextImpl);
+ }
+ } catch (Exception e) {
+ log.error("Sink open produced uncaught exception: ", e);
+ throw e;
+ } finally {
+ Thread.currentThread().setContextClassLoader(this.instanceClassLoader);
}
}
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
index 3f0e1a9..583fe50 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/sink/PulsarSink.java
@@ -58,6 +58,7 @@ public class PulsarSink<T> implements Sink<T> {
private final PulsarClient client;
private final PulsarSinkConfig pulsarSinkConfig;
private final Map<String, String> properties;
+ private final ClassLoader functionClassLoader;
private ComponentStatsManager stats;
@VisibleForTesting
@@ -237,12 +238,14 @@ public class PulsarSink<T> implements Sink<T> {
}
}
- public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties, ComponentStatsManager stats) {
+ public PulsarSink(PulsarClient client, PulsarSinkConfig pulsarSinkConfig, Map<String, String> properties,
+ ComponentStatsManager stats, ClassLoader functionClassLoader) {
this.client = client;
this.pulsarSinkConfig = pulsarSinkConfig;
this.topicSchema = new TopicSchema(client);
this.properties = properties;
this.stats = stats;
+ this.functionClassLoader = functionClassLoader;
}
@Override
@@ -314,9 +317,7 @@ public class PulsarSink<T> implements Sink<T> {
return (Schema<T>) Schema.BYTES;
}
- Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(),
- Thread.currentThread().getContextClassLoader());
-
+ Class<?> typeArg = Reflections.loadClass(this.pulsarSinkConfig.getTypeClassName(), functionClassLoader);
if (Void.class.equals(typeArg)) {
// return type is 'void', so there's no schema to check
return null;
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
index 4843362..cba90ff 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/source/PulsarSource.java
@@ -45,15 +45,18 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
private final PulsarClient pulsarClient;
private final PulsarSourceConfig pulsarSourceConfig;
private final Map<String, String> properties;
+ private final ClassLoader functionClassLoader;
private List<String> inputTopics;
private List<Consumer<T>> inputConsumers = Collections.emptyList();
private final TopicSchema topicSchema;
- public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties) {
+ public PulsarSource(PulsarClient pulsarClient, PulsarSourceConfig pulsarConfig, Map<String, String> properties,
+ ClassLoader functionClassLoader) {
this.pulsarClient = pulsarClient;
this.pulsarSourceConfig = pulsarConfig;
this.topicSchema = new TopicSchema(pulsarClient);
this.properties = properties;
+ this.functionClassLoader = functionClassLoader;
}
@Override
@@ -147,7 +150,7 @@ public class PulsarSource<T> extends PushSource<T> implements MessageListener<T>
Map<String, ConsumerConfig<T>> configs = new TreeMap<>();
Class<?> typeArg = Reflections.loadClass(this.pulsarSourceConfig.getTypeClassName(),
- Thread.currentThread().getContextClassLoader());
+ this.functionClassLoader);
checkArgument(!Void.class.equals(typeArg), "Input type of Pulsar Function cannot be Void");
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
index 700267e..fcea8a2 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/sink/PulsarSinkTest.java
@@ -166,7 +166,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(Void.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
Schema schema = pulsarSink.initializeSchema();
@@ -184,7 +184,7 @@ public class PulsarSinkTest {
// set type to be inconsistent to that of SerDe
pulsarConfig.setTypeClassName(Integer.class.getName());
pulsarConfig.setSerdeClassName(TestSerDe.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -206,7 +206,7 @@ public class PulsarSinkTest {
PulsarSinkConfig pulsarConfig = getPulsarConfigs();
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
@@ -225,7 +225,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(String.class.getName());
pulsarConfig.setSerdeClassName(TopicSchema.DEFAULT_SERDE);
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
@@ -241,7 +241,7 @@ public class PulsarSinkTest {
// set type to void
pulsarConfig.setTypeClassName(ComplexUserDefinedType.class.getName());
pulsarConfig.setSerdeClassName(ComplexSerDe.class.getName());
- PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(getPulsarClient(), pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
try {
pulsarSink.initializeSchema();
@@ -263,7 +263,7 @@ public class PulsarSinkTest {
/** test At-least-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
- PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ PulsarSink pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class), Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
@@ -309,7 +309,8 @@ public class PulsarSinkTest {
/** test At-most-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATMOST_ONCE);
- pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
@@ -351,7 +352,8 @@ public class PulsarSinkTest {
/** test Effectively-once **/
pulsarClient = getPulsarClient();
pulsarConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.EFFECTIVELY_ONCE);
- pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class));
+ pulsarSink = new PulsarSink(pulsarClient, pulsarConfig, new HashMap<>(), mock(ComponentStatsManager.class),
+ Thread.currentThread().getContextClassLoader());
pulsarSink.open(new HashMap<>(), mock(SinkContext.class));
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
index 4438bf0..99e9d91 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/source/PulsarSourceTest.java
@@ -127,7 +127,7 @@ public class PulsarSourceTest {
pulsarConfig.setTypeClassName(Void.class.getName());
@Cleanup
- PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
@@ -155,7 +155,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(topicSerdeClassNameMap);
@Cleanup
- PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
try {
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
fail("Should fail constructing java instance if function type is inconsistent with serde type");
@@ -182,7 +182,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(consumerConfigs);
@Cleanup
- PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
pulsarSource.open(new HashMap<>(), mock(SourceContext.class));
}
@@ -197,7 +197,7 @@ public class PulsarSourceTest {
pulsarConfig.setTopicSchema(consumerConfigs);
@Cleanup
- PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>());
+ PulsarSource<?> pulsarSource = new PulsarSource<>(getPulsarClient(), pulsarConfig, new HashMap<>(), Thread.currentThread().getContextClassLoader());
pulsarSource.setupConsumerConfigs();
}
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index f4c6eca..ee60811 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -25,7 +25,6 @@ import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import lombok.Builder;
-import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.io.SinkConfig;
@@ -61,7 +60,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.pulsar.common.functions.Utils.inferMissingArguments;
import static org.apache.pulsar.functions.utils.FunctionCommon.extractClassLoader;
-import static org.apache.pulsar.functions.utils.FunctionCommon.loadJar;
@Slf4j
public class LocalRunner {
@@ -407,7 +405,7 @@ public class LocalRunner {
serviceUrl,
stateStorageServiceUrl,
authConfig,
- new ClearTextSecretsProvider(), null);
+ new ClearTextSecretsProvider(), null, null);
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
diff --git a/pulsar-functions/runtime-all/pom.xml b/pulsar-functions/runtime-all/pom.xml
index 2393128..314efde 100644
--- a/pulsar-functions/runtime-all/pom.xml
+++ b/pulsar-functions/runtime-all/pom.xml
@@ -35,39 +35,43 @@
<dependencies>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-functions-runtime</artifactId>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-io-core</artifactId>
<version>${project.parent.version}</version>
</dependency>
- <!-- logging -->
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-functions-api</artifactId>
+ <version>${project.parent.version}</version>
</dependency>
+
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-client-api</artifactId>
+ <version>${project.parent.version}</version>
</dependency>
+
+ <!-- logging -->
+
<dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
- <groupId>${project.groupId}</groupId>
- <artifactId>pulsar-client-original</artifactId>
- <version>${project.parent.version}</version>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
</dependency>
<dependency>
- <groupId>io.netty</groupId>
- <artifactId>netty-all</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-api</artifactId>
</dependency>
<dependency>
- <groupId>io.grpc</groupId>
- <artifactId>grpc-all</artifactId>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-core</artifactId>
</dependency>
</dependencies>
@@ -75,333 +79,23 @@
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <configuration>
+ <!-- get all project dependencies -->
+ <descriptorRefs>
+ <descriptorRef>jar-with-dependencies</descriptorRef>
+ </descriptorRefs>
+ <finalName>java-instance</finalName>
+ <appendAssemblyId>false</appendAssemblyId>
+ </configuration>
<executions>
<execution>
+ <id>make-assembly</id>
+ <!-- bind to the packaging phase -->
<phase>package</phase>
<goals>
- <goal>shade</goal>
+ <goal>single</goal>
</goals>
- <configuration>
- <finalName>java-instance</finalName>
- <minimizeJar>false</minimizeJar>
- <shadedArtifactAttached>true</shadedArtifactAttached>
- <transformers>
- <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
- <transformer implementation="org.apache.maven.plugins.shade.resource.PluginXmlResourceTransformer" />
- </transformers>
- <artifactSet>
- <excludes>
- <exclude>io.netty:netty-common</exclude>
- <exclude>io.netty:netty-buffer</exclude>
- <exclude>io.netty:netty-codec-http2</exclude>
- <exclude>io.netty:netty-codec-http</exclude>
- <exclude>io.netty:netty-codec-socks</exclude>
- <exclude>io.netty:netty-codec</exclude>
- <exclude>io.netty:netty-handler</exclude>
- <exclude>io.netty:netty-handler-proxy</exclude>
- <exclude>io.netty:netty-transport</exclude>
- <exclude>io.netty:netty-resolver</exclude>
- </excludes>
- </artifactSet>
- <filters>
- <filter>
- <!-- Shading signed JARs will fail without
- this. http://stackoverflow.com/questions/999489/invalid-signature-file-when-attempting-to-run-a-jar -->
- <artifact>*:*</artifact>
- <excludes>
- <exclude>META-INF/*.SF</exclude>
- <exclude>META-INF/*.DSA</exclude>
- <exclude>META-INF/*.RSA</exclude>
- </excludes>
- </filter>
- </filters>
- <relocations>
- <relocation>
- <pattern>com.typesafe.netty</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.typesafe.netty</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.google</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.google</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.http</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.http</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.jute</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.jute</shadedPattern>
- </relocation>
- <relocation>
- <pattern>javax.servlet</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.servlet</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.junit</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.junit</shadedPattern>
- </relocation>
- <relocation>
- <pattern>junit</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.junit</shadedPattern>
- </relocation>
- <relocation>
- <pattern>net.jodah</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jodah</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.lz4</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.lz4</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.reactivestreams</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.reactivestreams</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.commons</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.commons</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.swagger</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.yaml</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.yaml</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.jctools</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.jctools</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.squareup.okhttp</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okhttp</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.grpc</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.grpc</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.joda</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.joda</shadedPattern>
- </relocation>
- <relocation>
- <pattern>javax.ws.rs</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.ws.rs</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.kubernetes</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.kubernetes</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.opencensus</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.opencensus</shadedPattern>
- </relocation>
- <relocation>
- <pattern>net.jpountz</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.jpountz</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.aspectj</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.aspectj</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-configuration</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-configuration</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.tukaani</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.tukaani</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.github</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-io</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-io</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.distributedlog</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.distributedlog</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.fasterxml</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.fasterxml</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.inferred</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.inferred</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.bookkeeper</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.bookkeeper</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bookkeeper</shadedPattern>
- </relocation>
- <relocation>
- <pattern>dlshade</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.dlshade</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.codehaus.jackson</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.jackson</shadedPattern>
- </relocation>
- <relocation>
- <pattern>net.java.dev.jna</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.net.java.dev.jna</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.curator</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.curator</shadedPattern>
- </relocation>
- <relocation>
- <pattern>javax.validation</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.validation</shadedPattern>
- </relocation>
- <relocation>
- <pattern>javax.activation</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.activation</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.prometheus</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.prometheus</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.zookeeper</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.zookeeper</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.jsonwebtoken</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.jsonwebtoken</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-codec</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-codec</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.thoughtworks.paranamer</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.thoughtworks.paranamer</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.codehaus.mojo</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.codehaus.mojo</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.github.luben</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.github.luben</shadedPattern>
- </relocation>
- <relocation>
- <pattern>jline</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.jline</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-logging</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-logging</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.bouncycastle</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.bouncycastle</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.xerial.snappy</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.xerial.snappy</shadedPattern>
- </relocation>
- <relocation>
- <pattern>javax.annotation</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.javax.annotation</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.checkerframework</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.checkerframework</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.yetus</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.yetus</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-cli</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-cli</shadedPattern>
- </relocation>
- <relocation>
- <pattern>commons-lang</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.commons-lang</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.squareup.okio</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.squareup.okio</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.rocksdb</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.rocksdb</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.objenesis</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.objenesis</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.eclipse.jetty</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.eclipse.jetty</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.apache.avro</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.avro</shadedPattern>
- </relocation>
- <relocation>
- <pattern>avro.shaded</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.avo.shaded</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.yahoo</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.yahoo</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.beust</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.beust</shadedPattern>
- </relocation>
- <relocation>
- <pattern>io.netty</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.netty</shadedPattern>
- </relocation>
- <relocation>
- <pattern>org.hamcrest</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.hamcrest</shadedPattern>
- </relocation>
- <relocation>
- <pattern>aj.org</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.aj.org</shadedPattern>
- </relocation>
- <relocation>
- <pattern>com.scurrilous</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.com.scurrilous</shadedPattern>
- </relocation>
- <relocation>
- <pattern>okio</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.okio</shadedPattern>
- </relocation>
- <!--
- asynchttpclient can only be shaded to be under `org.apache.pulsar.shade`
- see {@link https://github.com/apache/incubator-pulsar/pull/390}
- and {@link https://github.com/apache/incubator-pulsar/blob/master/pulsar-client/src/main/resources/ahc.properties}
- -->
- <relocation>
- <pattern>org.asynchttpclient</pattern>
- <shadedPattern>org.apache.pulsar.shade.org.asynchttpclient</shadedPattern>
- </relocation>
- <!-- DONT ever shade log4j, otherwise logging won't work anymore in running functions in process mode
- <relocation>
- <pattern>org.apache.logging</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.org.apache.logging</shadedPattern>
- </relocation>
- -->
- <relocation>
- <pattern>io.swagger</pattern>
- <shadedPattern>org.apache.pulsar.functions.runtime.shaded.io.swagger</shadedPattern>
- </relocation>
- </relocations>
- </configuration>
</execution>
</executions>
</plugin>
diff --git a/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
new file mode 100644
index 0000000..7b96927
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceMain.java
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.functions.instance;
+
+import java.io.File;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.Arrays;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This is the initial class that gets called when starting a Java Function instance.
+ * Multiple class loaders are used to separate function instance dependencies from user code dependencies
+ * This class will create three classloaders:
+ * 1. The root classloader that will share interfaces between the function instance
+ * classloader and user code classloader. This classloader will contain the following dependencies
+ * - pulsar-functions-api
+ * - pulsar-client-api
+ * - log4j-slf4j-impl
+ * - slf4j-api
+ * - log4j-core
+ * - log4j-api
+ *
+ * 2. The Function instance classloader, a child of the root classloader, that loads all pulsar broker/worker dependencies
+ * 3. The user code classloader, a child of the root classloader, that loads all user code dependencies
+ *
+ * This class should not use any other dependencies!
+ *
+ */
+public class JavaInstanceMain {
+
+ private static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
+
+ public JavaInstanceMain() { }
+
+ public static void main(String[] args) throws Exception {
+
+ // Set root classloader to current classpath
+ ClassLoader root = Thread.currentThread().getContextClassLoader();
+
+ // Get classpath for function instance
+ String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
+ if (functionInstanceClasspath == null) {
+ throw new IllegalArgumentException("Propery " + FUNCTIONS_INSTANCE_CLASSPATH + " is not set!");
+ }
+
+ List<File> files = new LinkedList<>();
+ for (String entry: functionInstanceClasspath.split(":")) {
+ if (isBlank(entry)) {
+ continue;
+ }
+ // replace any asterisks i.e. wildcards as they don't work with url classloader
+ File f = new File(entry.replace("*", ""));
+ if (f.exists()) {
+ if (f.isDirectory()) {
+ files.addAll(Arrays.asList(f.listFiles()));
+ } else {
+ files.add(new File(entry));
+ }
+ } else {
+ System.out.println(String.format("[WARN] %s on functions instance classpath does not exist", f.getAbsolutePath()));
+ }
+ }
+
+ ClassLoader functionInstanceClsLoader = loadJar(root, files.toArray(new File[files.size()]));
+
+ System.out.println("Using function root classloader: " + root);
+ System.out.println("Using function instance classloader: " + functionInstanceClsLoader);
+
+ // use the function instance classloader to create org.apache.pulsar.functions.runtime.JavaInstanceStarter
+ Object main = createInstance("org.apache.pulsar.functions.runtime.JavaInstanceStarter", functionInstanceClsLoader);
+
+ // Invoke start method of JavaInstanceStarter to start the function instance code
+ Method method = main.getClass().getDeclaredMethod("start", String[].class, ClassLoader.class, ClassLoader.class);
+
+ System.out.println("Starting function instance...");
+ method.invoke(main, args, functionInstanceClsLoader, root);
+ }
+
+ public static Object createInstance(String userClassName,
+ ClassLoader classLoader) {
+ Class<?> theCls;
+ try {
+ theCls = Class.forName(userClassName, true, classLoader);
+ } catch (ClassNotFoundException cnfe) {
+ throw new RuntimeException("Class " + userClassName + " must be in class path", cnfe);
+ }
+ Object result;
+ try {
+ Constructor<?> meth = theCls.getDeclaredConstructor();
+ meth.setAccessible(true);
+
+ result = meth.newInstance();
+ } catch (InstantiationException ie) {
+ throw new RuntimeException("User class must be concrete", ie);
+ } catch (NoSuchMethodException e) {
+ throw new RuntimeException("Class " + userClassName + " doesn't have such method", e);
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException("Class " + userClassName + " must have a no-arg constructor", e);
+ } catch (InvocationTargetException e) {
+ throw new RuntimeException("Class " + userClassName + " constructor throws exception", e);
+ }
+ return result;
+ }
+
+ public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
+ URL[] urls = new URL[jars.length];
+ for (int i = 0; i < jars.length; i++) {
+ urls[i] = jars[i].toURI().toURL();
+ }
+ return new URLClassLoader(urls, parent);
+ }
+
+ public static boolean isBlank(String str) {
+ int strLen;
+ if (str != null && (strLen = str.length()) != 0) {
+ for(int i = 0; i < strLen; ++i) {
+ if (!Character.isWhitespace(str.charAt(i))) {
+ return false;
+ }
+ }
+
+ return true;
+ } else {
+ return true;
+ }
+ }
+}
diff --git a/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml
new file mode 100644
index 0000000..843ad9e
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/resources/java_instance_log4j2.xml
@@ -0,0 +1,130 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<Configuration>
+ <name>pulsar-functions-instance</name>
+ <monitorInterval>30</monitorInterval>
+ <Properties>
+ <Property>
+ <name>pulsar.log.appender</name>
+ <value>RollingFile</value>
+ </Property>
+ <Property>
+ <name>pulsar.log.level</name>
+ <value>info</value>
+ </Property>
+ <Property>
+ <name>bk.log.level</name>
+ <value>info</value>
+ </Property>
+ </Properties>
+ <Appenders>
+ <Console>
+ <name>Console</name>
+ <target>SYSTEM_OUT</target>
+ <PatternLayout>
+ <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+ </PatternLayout>
+ </Console>
+ <RollingFile>
+ <name>RollingFile</name>
+ <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log</fileName>
+ <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
+ <immediateFlush>true</immediateFlush>
+ <PatternLayout>
+ <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+ </PatternLayout>
+ <Policies>
+ <TimeBasedTriggeringPolicy>
+ <interval>1</interval>
+ <modulate>true</modulate>
+ </TimeBasedTriggeringPolicy>
+ <SizeBasedTriggeringPolicy>
+ <size>1 GB</size>
+ </SizeBasedTriggeringPolicy>
+ <CronTriggeringPolicy>
+ <schedule>0 0 0 * * ?</schedule>
+ </CronTriggeringPolicy>
+ </Policies>
+ <DefaultRolloverStrategy>
+ <Delete>
+ <basePath>${sys:pulsar.function.log.dir}</basePath>
+ <maxDepth>2</maxDepth>
+ <IfFileName>
+ <glob>*/${sys:pulsar.function.log.file}*log.gz</glob>
+ </IfFileName>
+ <IfLastModified>
+ <age>30d</age>
+ </IfLastModified>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingFile>
+ <RollingRandomAccessFile>
+ <name>BkRollingFile</name>
+ <fileName>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk</fileName>
+ <filePattern>${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz</filePattern>
+ <immediateFlush>true</immediateFlush>
+ <PatternLayout>
+ <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+ </PatternLayout>
+ <Policies>
+ <TimeBasedTriggeringPolicy>
+ <interval>1</interval>
+ <modulate>true</modulate>
+ </TimeBasedTriggeringPolicy>
+ <SizeBasedTriggeringPolicy>
+ <size>1 GB</size>
+ </SizeBasedTriggeringPolicy>
+ <CronTriggeringPolicy>
+ <schedule>0 0 0 * * ?</schedule>
+ </CronTriggeringPolicy>
+ </Policies>
+ <DefaultRolloverStrategy>
+ <Delete>
+ <basePath>${sys:pulsar.function.log.dir}</basePath>
+ <maxDepth>2</maxDepth>
+ <IfFileName>
+ <glob>*/${sys:pulsar.function.log.file}.bk*log.gz</glob>
+ </IfFileName>
+ <IfLastModified>
+ <age>30d</age>
+ </IfLastModified>
+ </Delete>
+ </DefaultRolloverStrategy>
+ </RollingRandomAccessFile>
+ </Appenders>
+ <Loggers>
+ <Logger>
+ <name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
+ <level>${sys:bk.log.level}</level>
+ <additivity>false</additivity>
+ <AppenderRef>
+ <ref>BkRollingFile</ref>
+ </AppenderRef>
+ </Logger>
+ <Root>
+ <level>info</level>
+ <AppenderRef>
+ <ref>${sys:pulsar.log.appender}</ref>
+ <level>${sys:pulsar.log.level}</level>
+ </AppenderRef>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml
new file mode 100644
index 0000000..8e17a63
--- /dev/null
+++ b/pulsar-functions/runtime-all/src/main/resources/kubernetes_instance_log4j2.xml
@@ -0,0 +1,60 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+<Configuration>
+ <name>pulsar-functions-kubernetes-instance</name>
+ <monitorInterval>30</monitorInterval>
+ <Properties>
+ <Property>
+ <name>pulsar.log.level</name>
+ <value>info</value>
+ </Property>
+ <Property>
+ <name>bk.log.level</name>
+ <value>info</value>
+ </Property>
+ </Properties>
+ <Appenders>
+ <Console>
+ <name>Console</name>
+ <target>SYSTEM_OUT</target>
+ <PatternLayout>
+ <Pattern>%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n</Pattern>
+ </PatternLayout>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Logger>
+ <name>org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper</name>
+ <level>${sys:bk.log.level}</level>
+ <additivity>false</additivity>
+ <AppenderRef>
+ <ref>Console</ref>
+ </AppenderRef>
+ </Logger>
+ <Root>
+ <level>info</level>
+ <AppenderRef>
+ <ref>Console</ref>
+ <level>${sys:pulsar.log.level}</level>
+ </AppenderRef>
+ </Root>
+ </Loggers>
+</Configuration>
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
similarity index 87%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index 15af660..a26f287 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -36,7 +36,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
-import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
@@ -50,72 +50,70 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-/**
- * A function container implemented using java thread.
- */
+
@Slf4j
-public class JavaInstanceMain implements AutoCloseable {
+public class JavaInstanceStarter implements AutoCloseable {
@Parameter(names = "--function_details", description = "Function details json\n", required = true)
- protected String functionDetailsJsonString;
+ public String functionDetailsJsonString;
@Parameter(
names = "--jar",
description = "Path to Jar\n",
listConverter = StringConverter.class)
- protected String jarFile;
+ public String jarFile;
@Parameter(names = "--instance_id", description = "Instance Id\n", required = true)
- protected int instanceId;
+ public int instanceId;
@Parameter(names = "--function_id", description = "Function Id\n", required = true)
- protected String functionId;
+ public String functionId;
@Parameter(names = "--function_version", description = "Function Version\n", required = true)
- protected String functionVersion;
+ public String functionVersion;
@Parameter(names = "--pulsar_serviceurl", description = "Pulsar Service Url\n", required = true)
- protected String pulsarServiceUrl;
+ public String pulsarServiceUrl;
@Parameter(names = "--client_auth_plugin", description = "Client auth plugin name\n")
- protected String clientAuthenticationPlugin;
+ public String clientAuthenticationPlugin;
@Parameter(names = "--client_auth_params", description = "Client auth param\n")
- protected String clientAuthenticationParameters;
+ public String clientAuthenticationParameters;
@Parameter(names = "--use_tls", description = "Use tls connection\n")
- protected String useTls = Boolean.FALSE.toString();
+ public String useTls = Boolean.FALSE.toString();
@Parameter(names = "--tls_allow_insecure", description = "Allow insecure tls connection\n")
- protected String tlsAllowInsecureConnection = Boolean.TRUE.toString();
+ public String tlsAllowInsecureConnection = Boolean.TRUE.toString();
@Parameter(names = "--hostname_verification_enabled", description = "Enable hostname verification")
- protected String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
+ public String tlsHostNameVerificationEnabled = Boolean.FALSE.toString();
@Parameter(names = "--tls_trust_cert_path", description = "tls trust cert file path")
- protected String tlsTrustCertFilePath;
+ public String tlsTrustCertFilePath;
@Parameter(names = "--state_storage_serviceurl", description = "State Storage Service Url\n", required= false)
- protected String stateStorageServiceUrl;
+ public String stateStorageServiceUrl;
@Parameter(names = "--port", description = "Port to listen on\n", required = true)
- protected int port;
+ public int port;
@Parameter(names = "--metrics_port", description = "Port metrics will be exposed on\n", required = true)
- protected int metrics_port;
+ public int metrics_port;
@Parameter(names = "--max_buffered_tuples", description = "Maximum number of tuples to buffer\n", required = true)
- protected int maxBufferedTuples;
+ public int maxBufferedTuples;
@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
- protected int expectedHealthCheckInterval;
+ public int expectedHealthCheckInterval;
@Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
- protected String secretsProviderClassName;
+ public String secretsProviderClassName;
@Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
- protected String secretsProviderConfig;
+ public String secretsProviderConfig;
@Parameter(names = "--cluster_name", description = "The name of the cluster this instance is running on", required = true)
- protected String clusterName;
+ public String clusterName;
private Server server;
private RuntimeSpawner runtimeSpawner;
@@ -124,17 +122,22 @@ public class JavaInstanceMain implements AutoCloseable {
private HTTPServer metricsServer;
private ScheduledFuture healthCheckTimer;
- public JavaInstanceMain() { }
+ public JavaInstanceStarter() { }
+ public void start(String[] args, ClassLoader functionInstanceClassLoader, ClassLoader rootClassLoader) throws Exception {
+ Thread.currentThread().setContextClassLoader(functionInstanceClassLoader);
+
+ JCommander jcommander = new JCommander(this);
+ // parse args by JCommander
+ jcommander.parse(args);
- public void start() throws Exception {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionId(functionId);
instanceConfig.setFunctionVersion(functionVersion);
instanceConfig.setInstanceId(instanceId);
instanceConfig.setMaxBufferedTuples(maxBufferedTuples);
instanceConfig.setClusterName(clusterName);
- FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ Function.FunctionDetails.Builder functionDetailsBuilder = Function.FunctionDetails.newBuilder();
if (functionDetailsJsonString.charAt(0) == '\'') {
functionDetailsJsonString = functionDetailsJsonString.substring(1);
}
@@ -142,7 +145,7 @@ public class JavaInstanceMain implements AutoCloseable {
functionDetailsJsonString = functionDetailsJsonString.substring(0, functionDetailsJsonString.length() - 1);
}
JsonFormat.parser().merge(functionDetailsJsonString, functionDetailsBuilder);
- FunctionDetails functionDetails = functionDetailsBuilder.build();
+ Function.FunctionDetails functionDetails = functionDetailsBuilder.build();
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
@@ -164,7 +167,7 @@ public class JavaInstanceMain implements AutoCloseable {
SecretsProvider secretsProvider;
try {
- secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+ secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, functionInstanceClassLoader);
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -180,7 +183,7 @@ public class JavaInstanceMain implements AutoCloseable {
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
.tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
- secretsProvider, collectorRegistry);
+ secretsProvider, collectorRegistry, rootClassLoader);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
@@ -234,16 +237,6 @@ public class JavaInstanceMain implements AutoCloseable {
return Boolean.TRUE.toString().equals(param);
}
- public static void main(String[] args) throws Exception {
- JavaInstanceMain javaInstanceMain = new JavaInstanceMain();
- JCommander jcommander = new JCommander(javaInstanceMain);
- jcommander.setProgramName("JavaInstanceMain");
-
- // parse args by JCommander
- jcommander.parse(args);
- javaInstanceMain.start();
- }
-
@Override
public void close() {
try {
@@ -309,7 +302,7 @@ public class JavaInstanceMain implements AutoCloseable {
@Override
public void getMetrics(com.google.protobuf.Empty request,
- io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
+ io.grpc.stub.StreamObserver<org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData> responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
@@ -324,7 +317,7 @@ public class JavaInstanceMain implements AutoCloseable {
}
public void resetMetrics(com.google.protobuf.Empty request,
- io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
+ io.grpc.stub.StreamObserver<com.google.protobuf.Empty> responseObserver) {
Runtime runtime = runtimeSpawner.getRuntime();
if (runtime != null) {
try {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index e2862ec..b06e405 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -185,7 +185,7 @@ public class KubernetesRuntime implements Runtime {
}
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
- logConfigFile = "kubernetes_instance_log4j2.yml";
+ logConfigFile = "kubernetes_instance_log4j2.xml";
break;
case PYTHON:
logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 43e74e8..311934e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -97,7 +97,7 @@ class ProcessRuntime implements Runtime {
}
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
- logConfigFile = "java_instance_log4j2.yml";
+ logConfigFile = "java_instance_log4j2.xml";
break;
case PYTHON:
logConfigFile = System.getenv("PULSAR_HOME") + "/conf/functions-logging/logging_config.ini";
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index fca3e22..ede5c48 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -39,7 +39,6 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.instance.go.GoInstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.utils.FunctionCommon;
-import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import static org.apache.commons.lang3.StringUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -51,6 +50,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class RuntimeUtils {
private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
+ static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
public static List<String> composeCmd(InstanceConfig instanceConfig,
String instanceFile,
@@ -255,17 +255,25 @@ public class RuntimeUtils {
args.add("-cp");
String classpath = instanceFile;
+
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
classpath = classpath + ":" + extraDependenciesDir + "/*";
}
args.add(classpath);
- // Keep the same env property pointing to the Java instance file so that it can be picked up
- // by the child process and manually added to classpath
- args.add(String.format("-D%s=%s", FunctionCacheEntry.JAVA_INSTANCE_JAR_PROPERTY, instanceFile));
if (StringUtils.isNotEmpty(extraDependenciesDir)) {
args.add(String.format("-D%s=%s", FUNCTIONS_EXTRA_DEPS_PROPERTY, extraDependenciesDir));
}
+
+ // add complete classpath for broker/worker so that the function instance can load
+ // the functions instance dependencies separately from user code dependencies
+ String functionInstanceClasspath = System.getProperty(FUNCTIONS_INSTANCE_CLASSPATH);
+ if (functionInstanceClasspath == null) {
+ log.warn("Property {} is not set. Falling back to using classpath of current JVM", FUNCTIONS_INSTANCE_CLASSPATH);
+ functionInstanceClasspath = System.getProperty("java.class.path");
+ }
+ args.add(String.format("-D%s=%s", FUNCTIONS_INSTANCE_CLASSPATH, functionInstanceClasspath));
+
args.add("-Dlog4j.configurationFile=" + logConfigFile);
args.add("-Dpulsar.function.log.dir=" + genFunctionLogFolder(logDirectory, instanceConfig));
args.add("-Dpulsar.function.log.file=" + String.format(
@@ -283,7 +291,8 @@ public class RuntimeUtils {
args.add("-Xmx" + String.valueOf(resources.getRam()));
}
}
- args.add(JavaInstanceMain.class.getName());
+ args.add("org.apache.pulsar.functions.instance.JavaInstanceMain");
+
args.add("--jar");
args.add(originalCodeFileName);
} else if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.PYTHON) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index a3fd850..49bee87 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -20,12 +20,8 @@
package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
-
import io.prometheus.client.CollectorRegistry;
import lombok.extern.slf4j.Slf4j;
-
-import static org.apache.commons.lang3.StringUtils.isNotBlank;
-
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
@@ -36,6 +32,13 @@ import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
+import java.io.File;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLClassLoader;
+
+import static org.apache.commons.lang3.StringUtils.isNotBlank;
+
/**
* Thread based function container factory implementation.
*/
@@ -51,21 +54,36 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
- AuthenticationConfig authConfig, SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) throws Exception {
- this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider, collectorRegistry);
+ AuthenticationConfig authConfig, SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
+ this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+ storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
}
@VisibleForTesting
public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
- SecretsProvider secretsProvider, CollectorRegistry collectorRegistry) {
+ SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
+ ClassLoader rootClassLoader) {
+ if (rootClassLoader == null) {
+ rootClassLoader = Thread.currentThread().getContextClassLoader();
+ }
+
this.secretsProvider = secretsProvider;
- this.fnCache = new FunctionCacheManagerImpl();
+ this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarClient = pulsarClient;
this.storageServiceUrl = storageServiceUrl;
this.collectorRegistry = collectorRegistry;
}
+ public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
+ URL[] urls = new URL[jars.length];
+ for (int i = 0; i < jars.length; i++) {
+ urls[i] = jars[i].toURI().toURL();
+ }
+ return new URLClassLoader(urls, parent);
+ }
+
private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
throws PulsarClientException {
ClientBuilder clientBuilder = null;
diff --git a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
deleted file mode 100644
index 9846f05..0000000
--- a/pulsar-functions/runtime/src/main/resources/java_instance_log4j2.yml
+++ /dev/null
@@ -1,111 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-Configuration:
- name: pulsar-functions-instance
- monitorInterval: 30
-
- Properties:
- Property:
- - name: "pulsar.log.appender"
- value: "RollingFile"
- - name: "pulsar.log.level"
- value: "info"
- - name: "bk.log.level"
- value: "info"
-
- Appenders:
-
- # Console
- Console:
- name: Console
- target: SYSTEM_OUT
- PatternLayout:
- Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-
- # Rolling file appender configuration
- RollingFile:
- name: RollingFile
- fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.log"
- filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}-%d{MM-dd-yyyy}-%i.log.gz"
- immediateFlush: true
- PatternLayout:
- Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
- Policies:
- TimeBasedTriggeringPolicy:
- interval: 1
- modulate: true
- SizeBasedTriggeringPolicy:
- size: 1 GB
- # Trigger every day at midnight that also scan
- # roll-over strategy that deletes older file
- CronTriggeringPolicy:
- schedule: "0 0 0 * * ?"
- # Delete file older than 30days
- DefaultRolloverStrategy:
- Delete:
- basePath: ${sys:pulsar.function.log.dir}
- maxDepth: 2
- IfFileName:
- glob: "*/${sys:pulsar.function.log.file}*log.gz"
- IfLastModified:
- age: 30d
-
- # Rolling file appender configuration for bk
- RollingRandomAccessFile:
- name: BkRollingFile
- fileName: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk"
- filePattern: "${sys:pulsar.function.log.dir}/${sys:pulsar.function.log.file}.bk-%d{MM-dd-yyyy}-%i.log.gz"
- immediateFlush: true
- PatternLayout:
- Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
- Policies:
- TimeBasedTriggeringPolicy:
- interval: 1
- modulate: true
- SizeBasedTriggeringPolicy:
- size: 1 GB
- # Trigger every day at midnight that also scan
- # roll-over strategy that deletes older file
- CronTriggeringPolicy:
- schedule: "0 0 0 * * ?"
- # Delete file older than 30days
- DefaultRolloverStrategy:
- Delete:
- basePath: ${sys:pulsar.function.log.dir}
- maxDepth: 2
- IfFileName:
- glob: "*/${sys:pulsar.function.log.file}.bk*log.gz"
- IfLastModified:
- age: 30d
-
- Loggers:
-
- Logger:
- name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
- level: "${sys:bk.log.level}"
- additivity: false
- AppenderRef:
- - ref: BkRollingFile
-
- Root:
- level: info
- AppenderRef:
- - ref: "${sys:pulsar.log.appender}"
- level: "${sys:pulsar.log.level}"
diff --git a/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml b/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml
deleted file mode 100644
index 98ce897..0000000
--- a/pulsar-functions/runtime/src/main/resources/kubernetes_instance_log4j2.yml
+++ /dev/null
@@ -1,53 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-Configuration:
- name: pulsar-functions-kubernetes-instance
- monitorInterval: 30
-
- Properties:
- Property:
- - name: "pulsar.log.level"
- value: "info"
- - name: "bk.log.level"
- value: "info"
-
- Appenders:
-
- # Console
- Console:
- name: Console
- target: SYSTEM_OUT
- PatternLayout:
- Pattern: "%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n"
-
- Loggers:
-
- Logger:
- name: org.apache.pulsar.functions.runtime.shaded.org.apache.bookkeeper
- level: "${sys:bk.log.level}"
- additivity: false
- AppenderRef:
- - ref: Console
-
- Root:
- level: info
- AppenderRef:
- - ref: Console
- level: "${sys:pulsar.log.level}"
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 5128ff3..12390dc 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderCo
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import java.lang.reflect.Type;
@@ -43,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
+import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
@@ -135,6 +137,11 @@ public class KubernetesRuntimeTest {
this.logDirectory = "logs/functions";
}
+ @BeforeClass
+ public void setup() {
+ System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
+ }
+
@AfterMethod
public void tearDown() {
if (null != this.factory) {
@@ -288,13 +295,13 @@ public class KubernetesRuntimeTest {
"Actual args : " + StringUtils.join(args, " "));
String expectedArgs = "exec java -cp " + classpath
- + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
- + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
+ + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ " -Xmx" + String.valueOf(RESOURCES.getRam())
- + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ + " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + pulsarRootDir + "/" + userJarFile + " --instance_id "
+ "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index 842b0a3..bdb2976 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.functions.runtime;
+import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
@@ -46,6 +47,7 @@ import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
/**
@@ -123,6 +125,11 @@ public class ProcessRuntimeTest {
this.logDirectory = "Users/user/logs";
}
+ @BeforeClass
+ public void setup() {
+ System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
+ }
+
@AfterMethod
public void tearDown() {
if (null != this.factory) {
@@ -266,12 +273,12 @@ public class ProcessRuntimeTest {
}
String expectedArgs = "java -cp " + classpath
- + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ extraDepsEnv
- + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
+ + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ + " -Dlog4j.configurationFile=java_instance_log4j2.xml "
+ "-Dpulsar.function.log.dir=" + logDirectory + "/functions/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-" + config.getInstanceId()
- + " org.apache.pulsar.functions.runtime.JavaInstanceMain"
+ + " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ " --jar " + userJarFile + " --instance_id "
+ config.getInstanceId() + " --function_id " + config.getFunctionId()
+ " --function_version " + config.getFunctionVersion()
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
index c55111f..5dbb804 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheEntry.java
@@ -61,8 +61,9 @@ public class FunctionCacheEntry implements AutoCloseable {
FunctionCacheEntry(Collection<String> requiredJarFiles,
Collection<URL> requiredClasspaths,
URL[] libraryURLs,
- String initialInstanceId) {
- this.classLoader = FunctionClassLoaders.create(libraryURLs, FunctionClassLoaders.class.getClassLoader());
+ String initialInstanceId, ClassLoader rootClassLoader) {
+ this.classLoader = FunctionClassLoaders.create(libraryURLs, rootClassLoader);
+
this.classpaths = requiredClasspaths.stream()
.map(URL::toString)
.collect(Collectors.toSet());
@@ -70,17 +71,8 @@ public class FunctionCacheEntry implements AutoCloseable {
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
}
- private static final Set<String> JAVA_INSTANCE_ADDITIONAL_JARS = isNoneBlank(
- System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
- ? Collections.singleton(System.getProperty(JAVA_INSTANCE_JAR_PROPERTY))
- : Collections.emptySet();
-
- FunctionCacheEntry(String narArchive, String initialInstanceId) throws IOException {
- if (JAVA_INSTANCE_ADDITIONAL_JARS.isEmpty()) {
- log.warn("java-instance jar path not set in system-property= {} ", JAVA_INSTANCE_JAR_PROPERTY);
- throw new IllegalStateException(JAVA_INSTANCE_JAR_PROPERTY + " system property not set");
- }
- this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), JAVA_INSTANCE_ADDITIONAL_JARS);
+ FunctionCacheEntry(String narArchive, String initialInstanceId, ClassLoader rootClassLoader) throws IOException {
+ this.classLoader = NarClassLoader.getFromArchive(new File(narArchive), Collections.emptySet(), rootClassLoader);
this.classpaths = Collections.emptySet();
this.jarFiles = Collections.singleton(narArchive);
this.executionHolders = new HashSet<>(Collections.singleton(initialInstanceId));
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
index f2dcbe6..b5f7e42 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/functioncache/FunctionCacheManagerImpl.java
@@ -37,8 +37,11 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
/** Registered Functions **/
private final Map<String, FunctionCacheEntry> cacheFunctions;
- public FunctionCacheManagerImpl() {
+ private ClassLoader rootClassLoader;
+
+ public FunctionCacheManagerImpl(ClassLoader rootClassLoader) {
this.cacheFunctions = new ConcurrentHashMap<>();
+ this.rootClassLoader = rootClassLoader;
}
Map<String, FunctionCacheEntry> getCacheFunctions() {
@@ -93,7 +96,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
requiredJarFiles,
requiredClasspaths,
urls,
- eid));
+ eid, rootClassLoader));
} catch (Throwable cause) {
Exceptions.rethrowIOException(cause);
}
@@ -122,7 +125,7 @@ public class FunctionCacheManagerImpl implements FunctionCacheManager {
// Create new cache entry
try {
- cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid));
+ cacheFunctions.put(fid, new FunctionCacheEntry(narArchive, eid, rootClassLoader));
} catch (Throwable cause) {
Exceptions.rethrowIOException(cause);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 534710d..eb7272e 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -140,7 +140,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getStateStorageServiceUrl(),
authConfig,
new ClearTextSecretsProvider(),
- null);
+ null, null);
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory(
workerConfig.getPulsarServiceUrl(),
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 1511780..c0444e3 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -143,7 +143,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -189,7 +189,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -236,7 +236,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -296,7 +296,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -362,7 +362,7 @@ public class SchedulerManagerTest {
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider
- (), new CollectorRegistry());
+ (), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -473,7 +473,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -600,7 +600,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -654,7 +654,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -787,7 +787,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry());
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider(), new CollectorRegistry(), null);
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments