You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by aa...@apache.org on 2019/11/02 03:12:32 UTC
[pulsar] branch master updated: Function runtime pluggable (#5463)
This is an automated email from the ASF dual-hosted git repository.
aahmed pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 9728d54 Function runtime pluggable (#5463)
9728d54 is described below
commit 9728d5429bfe359dfc3de4b277b9fbe108e0bd8e
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Fri Nov 1 20:12:24 2019 -0700
Function runtime pluggable (#5463)
* Allow Pulsar Function Runtimes to be pluggable
* cleaning up
---
conf/functions_worker.yml | 38 ++-
.../worker/PulsarFunctionE2ESecurityTest.java | 8 +-
.../worker/PulsarFunctionLocalRunTest.java | 7 +-
.../worker/PulsarFunctionPublishTest.java | 10 +-
.../worker/PulsarWorkerAssignmentTest.java | 7 +-
.../apache/pulsar/io/PulsarFunctionAdminTest.java | 7 +-
.../apache/pulsar/io/PulsarFunctionE2ETest.java | 11 +-
.../apache/pulsar/io/PulsarFunctionTlsTest.java | 7 +-
.../org/apache/pulsar/functions/LocalRunner.java | 4 +-
.../functions/runtime/JavaInstanceStarter.java | 1 +
.../pulsar/functions/runtime/RuntimeFactory.java | 13 +
.../pulsar/functions/runtime/RuntimeUtils.java | 7 +-
.../{ => kubernetes}/KubernetesRuntime.java | 4 +-
.../{ => kubernetes}/KubernetesRuntimeFactory.java | 277 ++++++++++-----------
.../kubernetes/KubernetesRuntimeFactoryConfig.java | 128 ++++++++++
.../runtime/{ => process}/ProcessRuntime.java | 4 +-
.../{ => process}/ProcessRuntimeFactory.java | 68 ++++-
.../process/ProcessRuntimeFactoryConfig.java | 46 ++++
.../runtime/{ => thread}/ThreadRuntime.java | 5 +-
.../runtime/{ => thread}/ThreadRuntimeFactory.java | 80 +++---
.../runtime/thread/ThreadRuntimeFactoryConfig.java | 34 +++
.../pulsar/functions/worker/WorkerConfig.java | 212 +++++-----------
.../KubernetesRuntimeFactoryTest.java | 109 ++++++--
.../{ => kubernetes}/KubernetesRuntimeTest.java | 62 ++---
.../runtime/{ => process}/ProcessRuntimeTest.java | 33 ++-
.../apache/pulsar/functions/utils/Reflections.java | 2 +-
.../pulsar/functions/utils/ReflectionsTest.java | 2 +-
.../functions/worker/FunctionRuntimeManager.java | 80 ++----
.../functions/worker/FunctionsStatsGenerator.java | 2 +-
.../functions/worker/FunctionActionerTest.java | 21 +-
.../worker/FunctionRuntimeManagerTest.java | 216 +++++++++++++++-
.../functions/worker/MembershipManagerTest.java | 19 +-
.../functions/worker/SchedulerManagerTest.java | 9 +-
33 files changed, 1029 insertions(+), 504 deletions(-)
diff --git a/conf/functions_worker.yml b/conf/functions_worker.yml
index ed8e18e..88aeaa7 100644
--- a/conf/functions_worker.yml
+++ b/conf/functions_worker.yml
@@ -92,17 +92,33 @@ topicCompactionFrequencySec: 1800
# Function Runtime Management
###############################
-#threadContainerFactory:
+#### Process Runtime ####
+# Pulsar function instances are launched as processes
+
+functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory
+functionRuntimeFactoryConfigs:
+ # location of log files for functions
+ logDirectory: /tmp
+ # change the jar location only when you put the java instance jar in a different location
+ javaInstanceJarLocation:
+ # change the python instance location only when you put the python instance jar in a different location
+ pythonInstanceLocation:
+ # change the extra dependencies location:
+ extraFunctionDependenciesDir:
+
+#### Thread Runtime ####
+# Pulsar function instances are run as threads
+
+#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory
+#functionRuntimeFactoryConfigs:
+# # thread group name
# threadGroupName: "Thread Function Container Group"
-processContainerFactory:
- logDirectory:
- # change the jar location only when you put the java instance jar in a different location
- javaInstanceJarLocation:
- # change the python instance location only when you put the python instance jar in a different location
- pythonInstanceLocation:
- # change the extra dependencies location:
- extraFunctionDependenciesDir:
-#kubernetesContainerFactory:
+
+#### Kubernetes Runtime ####
+# Pulsar function are deployed to Kubernetes
+
+#functionRuntimeFactoryClassName: org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory
+#functionRuntimeFactoryConfigs:
# # uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in function worker
# k8Uri:
# # the kubernetes namespace to run the function instances. it is `default`, if this setting is left to be empty
@@ -116,7 +132,7 @@ processContainerFactory:
# # setting this to true is let function worker to submit functions to the same k8s cluster as function worker
# # is running. setting this to false if your function worker is not running as a k8 pod.
# submittingInsidePod: false
-# # setting the pulsar service url that pulsar function should use to connect to pulsar
+# # setting the pulsar service url that pulsar function should use to connect to pulsar
# # if it is not set, it will use the pulsar service url configured in worker service
# pulsarServiceUrl:
# # setting the pulsar admin url that pulsar function should use to connect to pulsar
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
index 5bafb77..3431793 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionE2ESecurityTest.java
@@ -47,6 +47,9 @@ import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.io.PulsarFunctionE2ETest;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
@@ -61,6 +64,7 @@ import java.lang.reflect.Method;
import java.net.URL;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
@@ -220,7 +224,9 @@ public class PulsarFunctionE2ESecurityTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
index a60b0ff..e57d5b3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionLocalRunTest.java
@@ -51,7 +51,10 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.LocalRunner;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.io.datagenerator.DataGeneratorPrintSink;
import org.apache.pulsar.io.datagenerator.DataGeneratorSource;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -303,7 +306,9 @@ public class PulsarFunctionLocalRunTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName(CLUSTER));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName(CLUSTER), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
index 3301fdb..e68f40d 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarFunctionPublishTest.java
@@ -46,6 +46,9 @@ import org.apache.pulsar.common.policies.data.FunctionStats;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -235,10 +238,9 @@ public class PulsarFunctionPublishTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
-// workerConfig.setProcessContainerFactory(new WorkerConfig.ProcessContainerFactory()
-// .setJavaInstanceJarLocation("/Users/jerrypeng/workspace/incubator-pulsar/pulsar-functions/runtime-all/target/java-instance.jar")
-// .setPythonInstanceLocation(""));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
index dc0da52..a39808b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/functions/worker/PulsarWorkerAssignmentTest.java
@@ -38,7 +38,10 @@ import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Utils;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function.Assignment;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -149,7 +152,9 @@ public class PulsarWorkerAssignmentTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
workerConfig.setPulsarWebServiceUrl("http://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
index f8358a1..134aa96 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionAdminTest.java
@@ -47,6 +47,9 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -178,7 +181,9 @@ public class PulsarFunctionAdminTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
index d42ce92..422f712 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionE2ETest.java
@@ -54,9 +54,12 @@ import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.util.FutureUtil;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceUtils;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionRuntimeManager;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@@ -72,7 +75,6 @@ import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
@@ -197,8 +199,6 @@ public class PulsarFunctionE2ETest {
config.setBrokerClientTlsEnabled(true);
config.setAllowAutoTopicCreationType("non-partitioned");
-
-
functionsWorkerService = createPulsarFunctionWorker(config);
urlTls = new URL(brokerServiceUrl);
Optional<WorkerService> functionWorkerService = Optional.of(functionsWorkerService);
@@ -319,8 +319,9 @@ public class PulsarFunctionE2ETest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
- // worker talks to local broker
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class)); // worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePortTls().get());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePortTls().get());
workerConfig.setFailureCheckFreqMs(100);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
index 41782a4..3ae6daf 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/io/PulsarFunctionTlsTest.java
@@ -42,11 +42,14 @@ import org.apache.pulsar.client.admin.Tenants;
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.api.utils.IdentityFunction;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.WorkerServer;
@@ -163,7 +166,9 @@ public class PulsarFunctionTlsTest {
workerConfig.setPulsarFunctionsNamespace(pulsarFunctionsNamespace);
workerConfig.setSchedulerClassName(
org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler.class.getName());
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("use"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new ThreadRuntimeFactoryConfig().setThreadGroupName("use"), Map.class));
// worker talks to local broker
workerConfig.setPulsarServiceUrl("pulsar://127.0.0.1:" + config.getBrokerServicePort().get());
workerConfig.setPulsarWebServiceUrl("https://127.0.0.1:" + config.getWebServicePort().get());
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index ad67a7a..996cd5e 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -33,9 +33,9 @@ import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
index a26f287..81a871e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceStarter.java
@@ -39,6 +39,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.Reflections;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
index 8cd08f8..c26ed60 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeFactory.java
@@ -20,8 +20,12 @@
package org.apache.pulsar.functions.runtime;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
+import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import java.util.Optional;
@@ -30,6 +34,11 @@ import java.util.Optional;
*/
public interface RuntimeFactory extends AutoCloseable {
+ void initialize(WorkerConfig workerConfig,
+ AuthenticationConfig authenticationConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
+ Optional<FunctionAuthProvider> functionAuthProvider) throws Exception;
+
/**
* Create a function container to execute a java instance.
*
@@ -53,5 +62,9 @@ public interface RuntimeFactory extends AutoCloseable {
@Override
void close();
+ static RuntimeFactory getFuntionRuntimeFactory(String className) {
+ return Reflections.createInstance(className, RuntimeFactory.class, Thread.currentThread().getContextClassLoader());
+ }
+
}
\ No newline at end of file
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index ede5c48..351fb67 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -30,6 +30,7 @@ import java.net.InetAddress;
import java.net.URL;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
@@ -50,7 +51,7 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
public class RuntimeUtils {
private static final String FUNCTIONS_EXTRA_DEPS_PROPERTY = "pulsar.functions.extra.dependencies.dir";
- static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
+ public static final String FUNCTIONS_INSTANCE_CLASSPATH = "pulsar.functions.instance.classpath";
public static List<String> composeCmd(InstanceConfig instanceConfig,
String instanceFile,
@@ -416,4 +417,8 @@ public class RuntimeUtils {
return input.split("\\s(?=([^\"]*\"[^\"]*\")*[^\"]*$)");
}
+ public static <T> T getRuntimeFunctionConfig(Map<String, Object> configMap, Class<T> functionRuntimeConfigClass) {
+ return ObjectMapperFactory.getThreadLocal().convertValue(configMap, functionRuntimeConfigClass);
+ }
+
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
similarity index 99%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 01b40b6..217c0db 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.util.concurrent.FutureCallback;
@@ -62,6 +62,8 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
similarity index 54%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 07a1853..c3fe560 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.common.annotations.VisibleForTesting;
import io.kubernetes.client.ApiClient;
@@ -28,9 +28,9 @@ import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.util.Config;
import java.nio.file.Paths;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.common.functions.Resources;
@@ -39,7 +39,10 @@ import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import java.lang.reflect.Field;
import java.util.Map;
@@ -54,132 +57,130 @@ import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuth
* Kubernetes based function container factory implementation.
*/
@Slf4j
+@Data
public class KubernetesRuntimeFactory implements RuntimeFactory {
static int NUM_RETRIES = 5;
static long SLEEP_BETWEEN_RETRIES_MS = 500;
- @Getter
- @Setter
- @NoArgsConstructor
- class KubernetesInfo {
- private String k8Uri;
- private String jobNamespace;
- private String pulsarDockerImageName;
- private String imagePullPolicy;
- private String pulsarRootDir;
- private String pulsarAdminUrl;
- private String pulsarServiceUrl;
- private String pythonDependencyRepository;
- private String pythonExtraDependencyRepository;
- private String extraDependenciesDir;
- private String changeConfigMap;
- private String changeConfigMapNamespace;
- private int percentMemoryPadding;
- private double cpuOverCommitRatio;
- private double memoryOverCommitRatio;
- }
- private final KubernetesInfo kubernetesInfo;
- private final Boolean submittingInsidePod;
- private final Boolean installUserCodeDependencies;
- private final Map<String, String> customLabels;
- private final Integer expectedMetricsCollectionInterval;
- private final String stateStorageServiceUri;
- private final AuthenticationConfig authConfig;
- private final String javaInstanceJarFile;
- private final String pythonInstanceFile;
- private final String extraDependenciesDir;
- private final SecretsProviderConfigurator secretsProviderConfigurator;
+ private String k8Uri;
+ private String jobNamespace;
+ private String pulsarDockerImageName;
+ private String imagePullPolicy;
+ private String pulsarRootDir;
+ private String pulsarAdminUrl;
+ private String pulsarServiceUrl;
+ private String pythonDependencyRepository;
+ private String pythonExtraDependencyRepository;
+ private String extraDependenciesDir;
+ private String changeConfigMap;
+ private String changeConfigMapNamespace;
+ private int percentMemoryPadding;
+ private double cpuOverCommitRatio;
+ private double memoryOverCommitRatio;
+ private Boolean submittingInsidePod;
+ private Boolean installUserCodeDependencies;
+ private Map<String, String> customLabels;
+ private Integer expectedMetricsCollectionInterval;
+ private String stateStorageServiceUri;
+ private AuthenticationConfig authConfig;
+ private String javaInstanceJarFile;
+ private String pythonInstanceFile;
private final String logDirectory = "logs/functions";
+ private Resources functionInstanceMinResources;
+ private boolean authenticationEnabled;
+
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
private Timer changeConfigMapTimer;
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
private AppsV1Api appsClient;
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
private CoreV1Api coreClient;
- private Resources functionInstanceMinResources;
- private final boolean authenticationEnabled;
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
+ private SecretsProviderConfigurator secretsProviderConfigurator;
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
private Optional<KubernetesFunctionAuthProvider> authProvider;
- private final byte[] serverCaBytes;
- @VisibleForTesting
- public KubernetesRuntimeFactory(String k8Uri,
- String jobNamespace,
- String pulsarDockerImageName,
- String imagePullPolicy,
- String pulsarRootDir,
- Boolean submittingInsidePod,
- Boolean installUserCodeDependencies,
- String pythonDependencyRepository,
- String pythonExtraDependencyRepository,
- String extraDependenciesDir,
- Map<String, String> customLabels,
- int percentMemoryPadding,
- double cpuOverCommitRatio,
- double memoryOverCommitRatio,
- String pulsarServiceUri,
- String pulsarAdminUri,
- String stateStorageServiceUri,
- AuthenticationConfig authConfig,
- byte[] serverCaBytes,
- Integer expectedMetricsCollectionInterval,
- String changeConfigMap,
- String changeConfigMapNamespace,
- Resources functionInstanceMinResources,
- SecretsProviderConfigurator secretsProviderConfigurator,
- boolean authenticationEnabled,
- Optional<FunctionAuthProvider> functionAuthProvider) {
- this.kubernetesInfo = new KubernetesInfo();
- this.kubernetesInfo.setK8Uri(k8Uri);
- if (!isEmpty(jobNamespace)) {
- this.kubernetesInfo.setJobNamespace(jobNamespace);
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
+ private byte[] serverCaBytes;
+
+ @Override
+ public boolean externallyManaged() {
+ return true;
+ }
+
+ @Override
+ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
+ Optional<FunctionAuthProvider> functionAuthProvider) {
+
+ KubernetesRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+ workerConfig.getFunctionRuntimeFactoryConfigs(), KubernetesRuntimeFactoryConfig.class);
+
+ this.k8Uri = factoryConfig.getK8Uri();
+ if (!isEmpty(factoryConfig.getJobNamespace())) {
+ this.jobNamespace = factoryConfig.getJobNamespace();
} else {
- this.kubernetesInfo.setJobNamespace("default");
+ this.jobNamespace = "default";
}
- if (!isEmpty(pulsarDockerImageName)) {
- this.kubernetesInfo.setPulsarDockerImageName(pulsarDockerImageName);
+ if (!isEmpty(factoryConfig.getPulsarDockerImageName())) {
+ this.pulsarDockerImageName = factoryConfig.getPulsarDockerImageName();
} else {
- this.kubernetesInfo.setPulsarDockerImageName("apachepulsar/pulsar");
+ this.pulsarDockerImageName = "apachepulsar/pulsar";
}
- if (!isEmpty(imagePullPolicy)) {
- this.kubernetesInfo.setImagePullPolicy(imagePullPolicy);
+ if (!isEmpty(factoryConfig.getImagePullPolicy())) {
+ this.imagePullPolicy = factoryConfig.getImagePullPolicy();
} else {
- this.kubernetesInfo.setImagePullPolicy("IfNotPresent");
+ this.imagePullPolicy = "IfNotPresent";
}
- if (!isEmpty(pulsarRootDir)) {
- this.kubernetesInfo.setPulsarRootDir(pulsarRootDir);
+ if (!isEmpty(factoryConfig.getPulsarRootDir())) {
+ this.pulsarRootDir = factoryConfig.getPulsarRootDir();
} else {
- this.kubernetesInfo.setPulsarRootDir("/pulsar");
+ this.pulsarRootDir = "/pulsar";
}
- if (StringUtils.isNotEmpty(extraDependenciesDir)) {
- if (Paths.get(extraDependenciesDir).isAbsolute()) {
- this.extraDependenciesDir = extraDependenciesDir;
+
+ this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
+ this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies();
+ this.pythonDependencyRepository = factoryConfig.getPythonDependencyRepository();
+ this.pythonExtraDependencyRepository = factoryConfig.getPythonExtraDependencyRepository();
+
+ if (StringUtils.isNotEmpty(factoryConfig.getExtraFunctionDependenciesDir())) {
+ if (Paths.get(factoryConfig.getExtraFunctionDependenciesDir()).isAbsolute()) {
+ this.extraDependenciesDir = factoryConfig.getExtraFunctionDependenciesDir();
} else {
- this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir()
- + "/" + extraDependenciesDir;
+ this.extraDependenciesDir = this.pulsarRootDir
+ + "/" + factoryConfig.getExtraFunctionDependenciesDir();
}
} else {
- this.extraDependenciesDir = this.kubernetesInfo.getPulsarRootDir() + "/instances/deps";
+ this.extraDependenciesDir = this.pulsarRootDir + "/instances/deps";
}
- this.kubernetesInfo.setExtraDependenciesDir(extraDependenciesDir);
- this.kubernetesInfo.setPythonDependencyRepository(pythonDependencyRepository);
- this.kubernetesInfo.setPythonExtraDependencyRepository(pythonExtraDependencyRepository);
- this.kubernetesInfo.setPulsarServiceUrl(pulsarServiceUri);
- this.kubernetesInfo.setPulsarAdminUrl(pulsarAdminUri);
- this.kubernetesInfo.setChangeConfigMap(changeConfigMap);
- this.kubernetesInfo.setChangeConfigMapNamespace(changeConfigMapNamespace);
- this.kubernetesInfo.setPercentMemoryPadding(percentMemoryPadding);
- this.kubernetesInfo.setCpuOverCommitRatio(cpuOverCommitRatio);
- this.kubernetesInfo.setMemoryOverCommitRatio(memoryOverCommitRatio);
- this.submittingInsidePod = submittingInsidePod;
- this.installUserCodeDependencies = installUserCodeDependencies;
- this.customLabels = customLabels;
- this.stateStorageServiceUri = stateStorageServiceUri;
- this.authConfig = authConfig;
- this.serverCaBytes = serverCaBytes;
- this.javaInstanceJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/java-instance.jar";
- this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
- this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
+
+ this.customLabels = factoryConfig.getCustomLabels();
+ this.percentMemoryPadding = factoryConfig.getPercentMemoryPadding();
+ this.cpuOverCommitRatio = factoryConfig.getCpuOverCommitRatio();
+ this.memoryOverCommitRatio = factoryConfig.getMemoryOverCommitRatio();
+ this.pulsarServiceUrl = StringUtils.isEmpty(factoryConfig.getPulsarServiceUrl())
+ ? workerConfig.getPulsarServiceUrl() : factoryConfig.getPulsarServiceUrl();
+ this.pulsarAdminUrl = StringUtils.isEmpty(factoryConfig.getPulsarAdminUrl())
+ ? workerConfig.getPulsarWebServiceUrl() : factoryConfig.getPulsarAdminUrl();
+ this.stateStorageServiceUri = workerConfig.getStateStorageServiceUrl();
+ this.authConfig = authenticationConfig;
+ this.expectedMetricsCollectionInterval = factoryConfig.getExpectedMetricsCollectionInterval() == null
+ ? -1 : factoryConfig.getExpectedMetricsCollectionInterval();
+ this.changeConfigMap = factoryConfig.getChangeConfigMap();
+ this.changeConfigMapNamespace = factoryConfig.getChangeConfigMapNamespace();
+ this.functionInstanceMinResources = workerConfig.getFunctionInstanceMinResources();
this.secretsProviderConfigurator = secretsProviderConfigurator;
- this.functionInstanceMinResources = functionInstanceMinResources;
- this.authenticationEnabled = authenticationEnabled;
+ this.authenticationEnabled = workerConfig.isAuthenticationEnabled();
+ this.javaInstanceJarFile = this.pulsarRootDir + "/instances/java-instance.jar";
+ this.pythonInstanceFile = this.pulsarRootDir + "/instances/python-instance/python_instance_main.py";
+ this.serverCaBytes = workerConfig.getTlsTrustChainBytes();
try {
setupClient();
} catch (Exception e) {
@@ -193,19 +194,13 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
+ functionAuthProvider.get().getClass().getName() + " must implement KubernetesFunctionAuthProvider");
} else {
KubernetesFunctionAuthProvider kubernetesFunctionAuthProvider = (KubernetesFunctionAuthProvider) functionAuthProvider.get();
- kubernetesFunctionAuthProvider.initialize(coreClient, jobNamespace, serverCaBytes);
+ kubernetesFunctionAuthProvider.initialize(coreClient, factoryConfig.getJobNamespace(), serverCaBytes);
this.authProvider = Optional.of(kubernetesFunctionAuthProvider);
}
} else {
this.authProvider = Optional.empty();
}
-
- }
-
- @Override
- public boolean externallyManaged() {
- return true;
}
@Override
@@ -237,29 +232,29 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
return new KubernetesRuntime(
appsClient,
coreClient,
- this.kubernetesInfo.getJobNamespace(),
+ jobNamespace,
customLabels,
installUserCodeDependencies,
- this.kubernetesInfo.getPythonDependencyRepository(),
- this.kubernetesInfo.getPythonExtraDependencyRepository(),
- this.kubernetesInfo.getPulsarDockerImageName(),
- this.kubernetesInfo.imagePullPolicy,
- this.kubernetesInfo.getPulsarRootDir(),
+ pythonDependencyRepository,
+ pythonExtraDependencyRepository,
+ pulsarDockerImageName,
+ imagePullPolicy,
+ pulsarRootDir,
instanceConfig,
instanceFile,
extraDependenciesDir,
logDirectory,
codePkgUrl,
originalCodeFileName,
- this.kubernetesInfo.getPulsarServiceUrl(),
- this.kubernetesInfo.getPulsarAdminUrl(),
+ pulsarServiceUrl,
+ pulsarAdminUrl,
stateStorageServiceUri,
authConfig,
secretsProviderConfigurator,
expectedMetricsCollectionInterval,
- this.kubernetesInfo.getPercentMemoryPadding(),
- this.kubernetesInfo.getCpuOverCommitRatio(),
- this.kubernetesInfo.getMemoryOverCommitRatio(),
+ percentMemoryPadding,
+ cpuOverCommitRatio,
+ memoryOverCommitRatio,
authProvider,
authenticationEnabled);
}
@@ -272,13 +267,13 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails);
validateMinResourcesRequired(functionDetails);
- secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, kubernetesInfo.getJobNamespace(), functionDetails);
+ secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, jobNamespace, functionDetails);
}
@VisibleForTesting
public void setupClient() throws Exception {
if (appsClient == null) {
- if (this.kubernetesInfo.getK8Uri() == null) {
+ if (k8Uri == null) {
log.info("k8Uri is null thus going by defaults");
ApiClient cli;
if (submittingInsidePod) {
@@ -292,43 +287,47 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
appsClient = new AppsV1Api();
coreClient = new CoreV1Api();
} else {
- log.info("Setting up k8Client using uri " + this.kubernetesInfo.getK8Uri());
- final ApiClient apiClient = new ApiClient().setBasePath(this.kubernetesInfo.getK8Uri());
+ log.info("Setting up k8Client using uri " + k8Uri);
+ final ApiClient apiClient = new ApiClient().setBasePath(k8Uri);
appsClient = new AppsV1Api(apiClient);
coreClient = new CoreV1Api(apiClient);
}
// Setup a timer to change stuff.
- if (!isEmpty(this.kubernetesInfo.getChangeConfigMap())) {
+ if (!isEmpty(changeConfigMap)) {
changeConfigMapTimer = new Timer();
+ final KubernetesRuntimeFactory THIS = this;
changeConfigMapTimer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
- fetchConfigMap();
+ fetchConfigMap(coreClient, changeConfigMap, changeConfigMapNamespace, THIS);
}
}, 300000, 300000);
}
}
}
- void fetchConfigMap() {
+ static void fetchConfigMap(CoreV1Api coreClient, String changeConfigMap,
+ String changeConfigMapNamespace,
+ KubernetesRuntimeFactory kubernetesRuntimeFactory) {
try {
- V1ConfigMap v1ConfigMap = coreClient.readNamespacedConfigMap(kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), null, true, false);
+ V1ConfigMap v1ConfigMap = coreClient.readNamespacedConfigMap(changeConfigMap, changeConfigMapNamespace, null, true, false);
Map<String, String> data = v1ConfigMap.getData();
if (data != null) {
- overRideKubernetesConfig(data);
+ overRideKubernetesConfig(data, kubernetesRuntimeFactory);
}
} catch (Exception e) {
- log.error("Error while trying to fetch configmap {} at namespace {}", kubernetesInfo.getChangeConfigMap(), kubernetesInfo.getChangeConfigMapNamespace(), e);
+ log.error("Error while trying to fetch configmap {} at namespace {}", changeConfigMap, changeConfigMapNamespace, e);
}
}
- void overRideKubernetesConfig(Map<String, String> data) throws Exception {
- for (Field field : KubernetesInfo.class.getDeclaredFields()) {
+ static void overRideKubernetesConfig(Map<String, String> data,
+ KubernetesRuntimeFactory kubernetesRuntimeFactory) throws Exception {
+ for (Field field : KubernetesRuntimeFactory.class.getDeclaredFields()) {
field.setAccessible(true);
- if (data.containsKey(field.getName()) && !data.get(field.getName()).equals(field.get(kubernetesInfo))) {
- log.info("Kubernetes Config {} changed from {} to {}", field.getName(), field.get(kubernetesInfo), data.get(field.getName()));
- field.set(kubernetesInfo, data.get(field.getName()));
+ if (data.containsKey(field.getName()) && !data.get(field.getName()).equals(field.get(kubernetesRuntimeFactory))) {
+ log.info("Kubernetes Config {} changed from {} to {}", field.getName(), field.get(kubernetesRuntimeFactory), data.get(field.getName()));
+ field.set(kubernetesRuntimeFactory, data.get(field.getName()));
}
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
new file mode 100644
index 0000000..a527baf
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.kubernetes;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+import java.util.Map;
+
+@Data
+@ToString
+@Accessors(chain = true)
+public class KubernetesRuntimeFactoryConfig {
+ @FieldContext(
+ doc = "Uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in"
+ + " function worker machine"
+ )
+ protected String k8Uri;
+ @FieldContext(
+ doc = "The Kubernetes namespace to run the function instances. It is `default`,"
+ + " if this setting is left to be empty"
+ )
+ protected String jobNamespace;
+ @FieldContext(
+ doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
+ )
+ protected String pulsarDockerImageName;
+
+ @FieldContext(
+ doc = "The image pull policy for image used to run function instance. By default it is `IfNotPresent`"
+ )
+ protected String imagePullPolicy;
+ @FieldContext(
+ doc = "The root directory of pulsar home directory in the pulsar docker image specified"
+ + " `pulsarDockerImageName`. By default it is under `/pulsar`. If you are using your own"
+ + " customized image in `pulsarDockerImageName`, you need to set this setting accordingly"
+ )
+ protected String pulsarRootDir;
+ @FieldContext(
+ doc = "This setting only takes effects if `k8Uri` is set to null. If your function worker is"
+ + " also running as a k8s pod, set this to `true` is let function worker to submit functions to"
+ + " the same k8s cluster as function worker is running. Set this to `false` if your function worker"
+ + " is not running as a k8s pod"
+ )
+ protected Boolean submittingInsidePod;
+ @FieldContext(
+ doc = "The pulsar service url that pulsar functions should use to connect to pulsar."
+ + " If it is not set, it will use the pulsar service url configured in function worker."
+ )
+ protected String pulsarServiceUrl;
+ @FieldContext(
+ doc = "The pulsar admin url that pulsar functions should use to connect to pulsar."
+ + " If it is not set, it will use the pulsar admin url configured in function worker."
+ )
+ protected String pulsarAdminUrl;
+ @FieldContext(
+ doc = "The flag indicates to install user code dependencies. (applied to python package)"
+ )
+ protected Boolean installUserCodeDependencies;
+ @FieldContext(
+ doc = "The repository that pulsar functions use to download python dependencies"
+ )
+ protected String pythonDependencyRepository;
+ @FieldContext(
+ doc = "The repository that pulsar functions use to download extra python dependencies"
+ )
+ protected String pythonExtraDependencyRepository;
+
+ @FieldContext(
+ doc = "the directory for dropping extra function dependencies. "
+ + "If it is not absolute path, it is relative to `pulsarRootDir`"
+ )
+ protected String extraFunctionDependenciesDir;
+ @FieldContext(
+ doc = "The custom labels that function worker uses to select the nodes for pods"
+ )
+ protected Map<String, String> customLabels;
+
+ @FieldContext(
+ doc = "The expected metrics collection interval, in seconds"
+ )
+ protected Integer expectedMetricsCollectionInterval = 30;
+ @FieldContext(
+ doc = "Kubernetes Runtime will periodically checkback on"
+ + " this configMap if defined and if there are any changes"
+ + " to the kubernetes specific stuff, we apply those changes"
+ )
+ protected String changeConfigMap;
+ @FieldContext(
+ doc = "The namespace for storing change config map"
+ )
+ protected String changeConfigMapNamespace;
+
+ @FieldContext(
+ doc = "Additional memory padding added on top of the memory requested by the function per on a per instance basis"
+ )
+ protected int percentMemoryPadding;
+
+ @FieldContext(
+ doc = "The ratio cpu request and cpu limit to be set for a function/source/sink." +
+ " The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio"
+ )
+ protected double cpuOverCommitRatio = 1.0;
+
+ @FieldContext(
+ doc = "The ratio memory request and memory limit to be set for a function/source/sink." +
+ " The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio"
+ )
+ protected double memoryOverCommitRatio = 1.0;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
similarity index 98%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
index 4334123..3c37372 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntime.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
@@ -37,6 +37,8 @@ import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
similarity index 71%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
index 87fd7d8..4e2fb8f 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactory.java
@@ -17,21 +17,26 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
import com.google.common.annotations.VisibleForTesting;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+import lombok.NoArgsConstructor;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
-import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import java.nio.file.Paths;
import java.util.Optional;
-import java.util.function.Consumer;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
@@ -39,17 +44,25 @@ import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuth
* Thread based function container factory implementation.
*/
@Slf4j
+@NoArgsConstructor
+@Data
public class ProcessRuntimeFactory implements RuntimeFactory {
- private final String pulsarServiceUrl;
- private final String stateStorageServiceUrl;
- private final boolean authenticationEnabled;
+ private String pulsarServiceUrl;
+ private String stateStorageServiceUrl;
+ private boolean authenticationEnabled;
private AuthenticationConfig authConfig;
- private SecretsProviderConfigurator secretsProviderConfigurator;
private String javaInstanceJarFile;
private String pythonInstanceFile;
private String logDirectory;
private String extraDependenciesDir;
+
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
+ private SecretsProviderConfigurator secretsProviderConfigurator;
+
+ @ToString.Exclude
+ @EqualsAndHashCode.Exclude
private Optional<FunctionAuthProvider> authProvider;
@VisibleForTesting
@@ -63,6 +76,41 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
SecretsProviderConfigurator secretsProviderConfigurator,
boolean authenticationEnabled,
Optional<FunctionAuthProvider> functionAuthProvider) {
+
+ initialize(pulsarServiceUrl, stateStorageServiceUrl, authConfig, javaInstanceJarFile,
+ pythonInstanceFile, logDirectory, extraDependenciesDir,
+ secretsProviderConfigurator, authenticationEnabled, functionAuthProvider);
+ }
+
+ @Override
+ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
+ Optional<FunctionAuthProvider> functionAuthProvider) {
+ ProcessRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+ workerConfig.getFunctionRuntimeFactoryConfigs(), ProcessRuntimeFactoryConfig.class);
+
+ initialize(workerConfig.getPulsarServiceUrl(),
+ workerConfig.getStateStorageServiceUrl(),
+ authenticationConfig,
+ factoryConfig.getJavaInstanceJarLocation(),
+ factoryConfig.getPythonInstanceLocation(),
+ factoryConfig.getLogDirectory(),
+ factoryConfig.getExtraFunctionDependenciesDir(),
+ secretsProviderConfigurator,
+ workerConfig.isAuthenticationEnabled(),
+ functionAuthProvider);
+ }
+
+ private void initialize(String pulsarServiceUrl,
+ String stateStorageServiceUrl,
+ AuthenticationConfig authConfig,
+ String javaInstanceJarFile,
+ String pythonInstanceFile,
+ String logDirectory,
+ String extraDependenciesDir,
+ SecretsProviderConfigurator secretsProviderConfigurator,
+ boolean authenticationEnabled,
+ Optional<FunctionAuthProvider> functionAuthProvider) {
this.pulsarServiceUrl = pulsarServiceUrl;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.authConfig = authConfig;
@@ -109,14 +157,14 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
if (this.extraDependenciesDir == null) {
String envProcessContainerExtraDependenciesDir =
- System.getProperty("pulsar.functions.extra.dependencies.dir");
+ System.getProperty("pulsar.functions.extra.dependencies.dir");
if (null != envProcessContainerExtraDependenciesDir) {
log.info("Extra dependencies location is not defined using"
- + " the location defined in system environment : {}", envProcessContainerExtraDependenciesDir);
+ + " the location defined in system environment : {}", envProcessContainerExtraDependenciesDir);
this.extraDependenciesDir = envProcessContainerExtraDependenciesDir;
} else {
log.info("No extra dependencies location is defined in either"
- + " function worker config or system environment");
+ + " function worker config or system environment");
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java
new file mode 100644
index 0000000..e17abd9
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeFactoryConfig.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.process;
+
+import lombok.Data;
+import lombok.ToString;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+@Data
+@ToString
+public class ProcessRuntimeFactoryConfig {
+ @FieldContext(
+ doc = "The path to the java instance. Change the jar location only when you put"
+ + " the java instance jar in a different location"
+ )
+ protected String javaInstanceJarLocation;
+ @FieldContext(
+ doc = "The path to the python instance. Change the python instance location only"
+ + " when you put the python instance in a different location"
+ )
+ protected String pythonInstanceLocation;
+ @FieldContext(
+ doc = "The path to the log directory"
+ )
+ protected String logDirectory;
+ @FieldContext(
+ doc = "the directory for dropping extra function dependencies"
+ )
+ protected String extraFunctionDependenciesDir;
+}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
similarity index 97%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
index 83e3782..3dea623 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntime.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.thread;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -30,6 +30,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
@@ -39,7 +40,7 @@ import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
* A function container implemented using java thread.
*/
@Slf4j
-class ThreadRuntime implements Runtime {
+public class ThreadRuntime implements Runtime {
// The thread that invokes the function
private Thread fnThread;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
similarity index 71%
rename from pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index 49bee87..5e65989 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -17,25 +17,30 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.thread;
import com.google.common.annotations.VisibleForTesting;
import io.prometheus.client.CollectorRegistry;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceCache;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.runtime.RuntimeFactory;
+import org.apache.pulsar.functions.runtime.RuntimeUtils;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
+import org.apache.pulsar.functions.worker.WorkerConfig;
-import java.io.File;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.net.URLClassLoader;
+import java.util.Optional;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
@@ -43,20 +48,22 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
* Thread based function container factory implementation.
*/
@Slf4j
+@NoArgsConstructor
public class ThreadRuntimeFactory implements RuntimeFactory {
- private final ThreadGroup threadGroup;
- private final FunctionCacheManager fnCache;
- private final PulsarClient pulsarClient;
- private final String storageServiceUrl;
- private final SecretsProvider secretsProvider;
- private final CollectorRegistry collectorRegistry;
+ @Getter
+ private ThreadGroup threadGroup;
+ private FunctionCacheManager fnCache;
+ private PulsarClient pulsarClient;
+ private String storageServiceUrl;
+ private SecretsProvider secretsProvider;
+ private CollectorRegistry collectorRegistry;
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, ClassLoader rootClassLoader) throws Exception {
- this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
+ initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
storageServiceUrl, secretsProvider, collectorRegistry, rootClassLoader);
}
@@ -64,24 +71,9 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
ClassLoader rootClassLoader) {
- if (rootClassLoader == null) {
- rootClassLoader = Thread.currentThread().getContextClassLoader();
- }
- this.secretsProvider = secretsProvider;
- this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
- this.threadGroup = new ThreadGroup(threadGroupName);
- this.pulsarClient = pulsarClient;
- this.storageServiceUrl = storageServiceUrl;
- this.collectorRegistry = collectorRegistry;
- }
-
- public static ClassLoader loadJar(ClassLoader parent, File[] jars) throws MalformedURLException {
- URL[] urls = new URL[jars.length];
- for (int i = 0; i < jars.length; i++) {
- urls[i] = jars[i].toURI().toURL();
- }
- return new URLClassLoader(urls, parent);
+ initialize(threadGroupName, pulsarClient, storageServiceUrl,
+ secretsProvider, collectorRegistry, rootClassLoader);
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -104,7 +96,35 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
}
return null;
}
-
+
+ private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
+ SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
+ ClassLoader rootClassLoader) {
+ if (rootClassLoader == null) {
+ rootClassLoader = Thread.currentThread().getContextClassLoader();
+ }
+
+ this.secretsProvider = secretsProvider;
+ this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
+ this.threadGroup = new ThreadGroup(threadGroupName);
+ this.pulsarClient = pulsarClient;
+ this.storageServiceUrl = storageServiceUrl;
+ this.collectorRegistry = collectorRegistry;
+ }
+
+ @Override
+ public void initialize(WorkerConfig workerConfig, AuthenticationConfig authenticationConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
+ Optional<FunctionAuthProvider> functionAuthProvider) throws Exception {
+ ThreadRuntimeFactoryConfig factoryConfig = RuntimeUtils.getRuntimeFunctionConfig(
+ workerConfig.getFunctionRuntimeFactoryConfigs(), ThreadRuntimeFactoryConfig.class);
+
+ initialize(factoryConfig.getThreadGroupName(),
+ createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
+ workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
+ null, null);
+ }
+
@Override
public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
String originalCodeFileName,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java
new file mode 100644
index 0000000..c078e8d
--- /dev/null
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactoryConfig.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.runtime.thread;
+
+import lombok.Data;
+import lombok.ToString;
+import lombok.experimental.Accessors;
+import org.apache.pulsar.common.configuration.FieldContext;
+
+@Data
+@ToString
+@Accessors(chain = true)
+public class ThreadRuntimeFactoryConfig {
+ @FieldContext(
+ doc = "The name of thread group running function threads"
+ )
+ protected String threadGroupName;
+}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
similarity index 72%
rename from pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
rename to pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 7f00a61..7a25629 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -47,6 +47,10 @@ import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.experimental.Accessors;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
@Data
@Setter
@@ -328,162 +332,20 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
}
- @Data
- @Setter
- @Getter
- @EqualsAndHashCode
- @ToString
- public static class ThreadContainerFactory {
- @FieldContext(
- doc = "The name of thread group running function threads"
- )
- private String threadGroupName;
- }
- @FieldContext(
- category = CATEGORY_FUNC_RUNTIME_MNG,
- doc = "Thread based runtime settings"
- )
- private ThreadContainerFactory threadContainerFactory;
+ /******** Function Runtime configurations **********/
+
- @Data
- @Setter
- @Getter
- @EqualsAndHashCode
- @ToString
- public static class ProcessContainerFactory {
- @FieldContext(
- doc = "The path to the java instance. Change the jar location only when you put"
- + " the java instance jar in a different location"
- )
- private String javaInstanceJarLocation;
- @FieldContext(
- doc = "The path to the python instance. Change the python instance location only"
- + " when you put the python instance in a different location"
- )
- private String pythonInstanceLocation;
- @FieldContext(
- doc = "The path to the log directory"
- )
- private String logDirectory;
- @FieldContext(
- doc = "the directory for dropping extra function dependencies"
- )
- private String extraFunctionDependenciesDir;
- }
@FieldContext(
- category = CATEGORY_FUNC_RUNTIME_MNG,
- doc = "Process based runtime settings"
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "The classname of the function runtime factory."
)
- private ProcessContainerFactory processContainerFactory;
-
- @Data
- @Setter
- @Getter
- @EqualsAndHashCode
- @ToString
- public static class KubernetesContainerFactory {
- @FieldContext(
- doc = "Uri to kubernetes cluster, leave it to empty and it will use the kubernetes settings in"
- + " function worker machine"
- )
- private String k8Uri;
- @FieldContext(
- doc = "The Kubernetes namespace to run the function instances. It is `default`,"
- + " if this setting is left to be empty"
- )
- private String jobNamespace;
- @FieldContext(
- doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
- )
- private String pulsarDockerImageName;
-
- @FieldContext(
- doc = "The image pull policy for image used to run function instance. By default it is `IfNotPresent`"
- )
- private String imagePullPolicy;
- @FieldContext(
- doc = "The root directory of pulsar home directory in the pulsar docker image specified"
- + " `pulsarDockerImageName`. By default it is under `/pulsar`. If you are using your own"
- + " customized image in `pulsarDockerImageName`, you need to set this setting accordingly"
- )
- private String pulsarRootDir;
- @FieldContext(
- doc = "This setting only takes effects if `k8Uri` is set to null. If your function worker is"
- + " also running as a k8s pod, set this to `true` is let function worker to submit functions to"
- + " the same k8s cluster as function worker is running. Set this to `false` if your function worker"
- + " is not running as a k8s pod"
- )
- private Boolean submittingInsidePod;
- @FieldContext(
- doc = "The pulsar service url that pulsar functions should use to connect to pulsar."
- + " If it is not set, it will use the pulsar service url configured in function worker."
- )
- private String pulsarServiceUrl;
- @FieldContext(
- doc = "The pulsar admin url that pulsar functions should use to connect to pulsar."
- + " If it is not set, it will use the pulsar admin url configured in function worker."
- )
- private String pulsarAdminUrl;
- @FieldContext(
- doc = "The flag indicates to install user code dependencies. (applied to python package)"
- )
- private Boolean installUserCodeDependencies;
- @FieldContext(
- doc = "The repository that pulsar functions use to download python dependencies"
- )
- private String pythonDependencyRepository;
- @FieldContext(
- doc = "The repository that pulsar functions use to download extra python dependencies"
- )
- private String pythonExtraDependencyRepository;
-
- @FieldContext(
- doc = "the directory for dropping extra function dependencies. "
- + "If it is not absolute path, it is relative to `pulsarRootDir`"
- )
- private String extraFunctionDependenciesDir;
- @FieldContext(
- doc = "The custom labels that function worker uses to select the nodes for pods"
- )
- private Map<String, String> customLabels;
-
- @FieldContext(
- doc = "The expected metrics collection interval, in seconds"
- )
- private Integer expectedMetricsCollectionInterval = 30;
- @FieldContext(
- doc = "Kubernetes Runtime will periodically checkback on"
- + " this configMap if defined and if there are any changes"
- + " to the kubernetes specific stuff, we apply those changes"
- )
- private String changeConfigMap;
- @FieldContext(
- doc = "The namespace for storing change config map"
- )
- private String changeConfigMapNamespace;
-
- @FieldContext(
- doc = "Additional memory padding added on top of the memory requested by the function per on a per instance basis"
- )
- private int percentMemoryPadding;
-
- @FieldContext(
- doc = "The ratio cpu request and cpu limit to be set for a function/source/sink." +
- " The formula for cpu request is cpuRequest = userRequestCpu / cpuOverCommitRatio"
- )
- private double cpuOverCommitRatio = 1.0;
+ private String functionRuntimeFactoryClassName;
- @FieldContext(
- doc = "The ratio memory request and memory limit to be set for a function/source/sink." +
- " The formula for memory request is memoryRequest = userRequestMemory / memoryOverCommitRatio"
- )
- private double memoryOverCommitRatio = 1.0;
- }
@FieldContext(
- category = CATEGORY_FUNC_RUNTIME_MNG,
- doc = "Kubernetes based runtime settings"
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "A map of configs for function runtime factory."
)
- private KubernetesContainerFactory kubernetesContainerFactory;
+ private Map<String, Object> functionRuntimeFactoryConfigs;
@FieldContext(
category = CATEGORY_FUNC_RUNTIME_MNG,
@@ -569,4 +431,54 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
public void setProperties(Properties properties) {
this.properties = properties;
}
+
+ /********* DEPRECATED CONFIGS *********/
+
+ @Deprecated
+ @Data
+ /**
+ * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+ * for specifying the function runtime and configs to use
+ */
+ public static class ThreadContainerFactory extends ThreadRuntimeFactoryConfig {
+
+ }
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Thread based runtime settings"
+ )
+ @Deprecated
+ private ThreadContainerFactory threadContainerFactory;
+
+ @Deprecated
+ @Data
+ /**
+ * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+ * for specifying the function runtime and configs to use
+ */
+ public static class ProcessContainerFactory extends ProcessRuntimeFactoryConfig {
+
+ }
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Process based runtime settings"
+ )
+ @Deprecated
+ private ProcessContainerFactory processContainerFactory;
+
+ @Deprecated
+ @Data
+ /**
+ * @Deprecated in favor for using functionRuntimeFactoryClassName and functionRuntimeFactoryConfigs
+ * for specifying the function runtime and configs to use
+ */
+ public static class KubernetesContainerFactory extends KubernetesRuntimeFactoryConfig {
+
+ }
+ @FieldContext(
+ category = CATEGORY_FUNC_RUNTIME_MNG,
+ doc = "Kubernetes based runtime settings"
+ )
+ @Deprecated
+ private KubernetesContainerFactory kubernetesContainerFactory;
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
similarity index 69%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index ef21e14..1bbb27d 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -17,15 +17,17 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
+import io.kubernetes.client.models.V1ConfigMap;
import io.kubernetes.client.models.V1PodSpec;
import io.kubernetes.client.models.V1StatefulSet;
import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
import org.apache.pulsar.common.functions.Resources;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthData;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
@@ -33,8 +35,12 @@ import org.apache.pulsar.functions.auth.KubernetesSecretsTokenAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.worker.WorkerConfig;
+import org.mockito.Mockito;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -43,6 +49,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
@@ -136,31 +144,38 @@ public class KubernetesRuntimeFactoryTest {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir,
Resources minResources,
Optional<FunctionAuthProvider> functionAuthProvider) throws Exception {
- KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
- null,
- null,
- null,
- null,
- pulsarRootDir,
- false,
- true,
- "myrepo",
- "anotherrepo",
- extraDepsDir,
- null,
- 0,
- 1.0,
- 1.0,
- pulsarServiceUrl,
- pulsarAdminUrl,
- stateStorageServiceUrl,
- null,
- null,
- null,
- null,
- null,
- minResources,
- new TestSecretProviderConfigurator(), false, functionAuthProvider));
+ KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+ kubernetesRuntimeFactoryConfig.setK8Uri(null);
+ kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+ kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+ kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+ kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+ kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+ kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+ kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+ kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+ kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+ kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(0);
+ kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(1.0);
+ kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(1.0);
+ kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+ kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+
+ workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+
+ workerConfig.setFunctionInstanceMinResources(minResources);
+ workerConfig.setStateStorageServiceUrl(null);
+ workerConfig.setAuthenticationEnabled(false);
+
+ factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), functionAuthProvider);
doNothing().when(factory).setupClient();
return factory;
}
@@ -317,4 +332,46 @@ public class KubernetesRuntimeFactoryTest {
}
}
}
+
+ @Test
+ public void testDynamicConfigMapLoading() throws Exception {
+
+ String changeConfigMap = "changeMap";
+ String changeConfigNamespace = "changeConfigNamespace";
+
+ KubernetesRuntimeFactory kubernetesRuntimeFactory = getKuberentesRuntimeFactory();
+ CoreV1Api coreV1Api = Mockito.mock(CoreV1Api.class);
+ V1ConfigMap v1ConfigMap = new V1ConfigMap();
+ Mockito.doReturn(v1ConfigMap).when(coreV1Api).readNamespacedConfigMap(any(), any(), any(), any(), any());
+ KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory);
+ Mockito.verify(coreV1Api, Mockito.times(1)).readNamespacedConfigMap(eq(changeConfigMap), eq(changeConfigNamespace), eq(null), eq(true), eq(false));
+ KubernetesRuntimeFactory expected = getKuberentesRuntimeFactory();
+ assertEquals(kubernetesRuntimeFactory, expected);
+
+ HashMap<String, String> configs = new HashMap<>();
+ configs.put("pulsarDockerImageName", "test_dockerImage2");
+ configs.put("imagePullPolicy", "test_imagePullPolicy2");
+ v1ConfigMap.setData(configs);
+ KubernetesRuntimeFactory.fetchConfigMap(coreV1Api, changeConfigMap, changeConfigNamespace, kubernetesRuntimeFactory);
+ Mockito.verify(coreV1Api, Mockito.times(2)).readNamespacedConfigMap(eq(changeConfigMap), eq(changeConfigNamespace), eq(null), eq(true), eq(false));
+
+ assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "test_dockerImage2");
+ assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "test_imagePullPolicy2");
+ }
+
+ private KubernetesRuntimeFactory getKuberentesRuntimeFactory() {
+ KubernetesRuntimeFactory kubernetesRuntimeFactory = new KubernetesRuntimeFactory();
+ WorkerConfig workerConfig = new WorkerConfig();
+ KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+ kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
+ kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
+ kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
+ kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
+ workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+ AuthenticationConfig authenticationConfig = AuthenticationConfig.builder().build();
+ kubernetesRuntimeFactory.initialize(workerConfig, authenticationConfig, new DefaultSecretsProviderConfigurator(), Optional.empty());
+ return kubernetesRuntimeFactory;
+ }
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
similarity index 91%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 00d8930..eb47caf 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.kubernetes;
import com.google.protobuf.util.JsonFormat;
import io.kubernetes.client.apis.AppsV1Api;
@@ -25,13 +25,16 @@ import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1Container;
import io.kubernetes.client.models.V1PodSpec;
import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
@@ -154,31 +157,36 @@ public class KubernetesRuntimeTest {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
- KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory(
- null,
- null,
- null,
- null,
- pulsarRootDir,
- false,
- true,
- "myrepo",
- "anotherrepo",
- extraDepsDir,
- null,
- percentMemoryPadding,
- cpuOverCommitRatio,
- memoryOverCommitRatio,
- pulsarServiceUrl,
- pulsarAdminUrl,
- stateStorageServiceUrl,
- null,
- null,
- null,
- null,
-null,
- null, new TestSecretProviderConfigurator(), false,
- Optional.empty()));
+ KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+ kubernetesRuntimeFactoryConfig.setK8Uri(null);
+ kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+ kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+ kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+ kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+ kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+ kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+ kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+ kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+ kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+ kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+ kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+ kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+ kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+ kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+ workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+ workerConfig.setFunctionInstanceMinResources(null);
+ workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+ workerConfig.setAuthenticationEnabled(false);
+
+ factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), Optional.empty());
doNothing().when(factory).setupClient();
return factory;
}
@@ -242,8 +250,6 @@ null,
Assert.assertEquals(containerSpec.getResources().getRequests().get("memory").getNumber().longValue(), expectedRamWithPadding);
}
-
-
@Test
public void testJavaConstructor() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
similarity index 90%
rename from pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
rename to pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 0329ae8..84c6160 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.pulsar.functions.runtime;
+package org.apache.pulsar.functions.runtime.process;
import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
import static org.testng.Assert.assertEquals;
@@ -40,13 +40,16 @@ import java.util.Optional;
import io.kubernetes.client.apis.AppsV1Api;
import io.kubernetes.client.apis.CoreV1Api;
import io.kubernetes.client.models.V1PodSpec;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntime;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
+import org.apache.pulsar.functions.worker.WorkerConfig;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -139,15 +142,25 @@ public class ProcessRuntimeTest {
}
private ProcessRuntimeFactory createProcessRuntimeFactory(String extraDependenciesDir) {
- return new ProcessRuntimeFactory(
- pulsarServiceUrl,
- stateStorageServiceUrl,
- null, /* auth config */
- javaInstanceJarFile,
- pythonInstanceFile,
- logDirectory,
- extraDependenciesDir, /* extra dependencies dir */
- new TestSecretsProviderConfigurator(), false, Optional.empty());
+ ProcessRuntimeFactory processRuntimeFactory = new ProcessRuntimeFactory();
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setPulsarServiceUrl(pulsarServiceUrl);
+ workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+ workerConfig.setAuthenticationEnabled(false);
+
+ ProcessRuntimeFactoryConfig processRuntimeFactoryConfig = new ProcessRuntimeFactoryConfig();
+ processRuntimeFactoryConfig.setJavaInstanceJarLocation(javaInstanceJarFile);
+ processRuntimeFactoryConfig.setPythonInstanceLocation(pythonInstanceFile);
+ processRuntimeFactoryConfig.setLogDirectory(logDirectory);
+ processRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDependenciesDir);
+
+ workerConfig.setFunctionRuntimeFactoryClassName(ProcessRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(processRuntimeFactoryConfig, Map.class));
+ processRuntimeFactory.initialize(workerConfig, null, new TestSecretsProviderConfigurator(), Optional.empty());
+
+ return processRuntimeFactory;
}
FunctionDetails createFunctionDetails(FunctionDetails.Runtime runtime) {
diff --git a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
index 1d89738..48644f0 100644
--- a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
+++ b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/Reflections.java
@@ -67,7 +67,7 @@ public class Reflections {
throw new RuntimeException("User class must be in class path", cnfe);
}
if (!xface.isAssignableFrom(theCls)) {
- throw new RuntimeException(userClassName + " not " + xface.getName());
+ throw new RuntimeException(userClassName + " does not implement " + xface.getName());
}
Class<T> tCls = (Class<T>) theCls.asSubclass(xface);
T result;
diff --git a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
index da2a540..1ad03b1 100644
--- a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
+++ b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/ReflectionsTest.java
@@ -149,7 +149,7 @@ public class ReflectionsTest {
fail("Should fail to load a class that isn't assignable");
} catch (RuntimeException re) {
assertEquals(
- aImplementation.class.getName() + " not " + bInterface.class.getName(),
+ aImplementation.class.getName() + " does not implement " + bInterface.class.getName(),
re.getMessage());
}
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index 08d412b..b0f41b0 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -31,16 +31,17 @@ import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.common.functions.WorkerInfo;
import org.apache.pulsar.common.policies.data.ErrorData;
import org.apache.pulsar.common.policies.data.FunctionStats;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
-import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
-import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
@@ -150,7 +151,7 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
- Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
+ Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();
AuthenticationConfig authConfig = null;
if (workerConfig.isAuthenticationEnabled()) {
authConfig = AuthenticationConfig.builder()
@@ -168,57 +169,28 @@ public class FunctionRuntimeManager implements AutoCloseable{
}
}
- if (workerConfig.getThreadContainerFactory() != null) {
- this.runtimeFactory = new ThreadRuntimeFactory(
- workerConfig.getThreadContainerFactory().getThreadGroupName(),
- workerConfig.getPulsarServiceUrl(),
- workerConfig.getStateStorageServiceUrl(),
- authConfig,
- new ClearTextSecretsProvider(),
- null, null);
- } else if (workerConfig.getProcessContainerFactory() != null) {
- this.runtimeFactory = new ProcessRuntimeFactory(
- workerConfig.getPulsarServiceUrl(),
- workerConfig.getStateStorageServiceUrl(),
- authConfig,
- workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
- workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
- workerConfig.getProcessContainerFactory().getLogDirectory(),
- workerConfig.getProcessContainerFactory().getExtraFunctionDependenciesDir(),
- secretsProviderConfigurator,
- workerConfig.isAuthenticationEnabled(),
- functionAuthProvider);
- } else if (workerConfig.getKubernetesContainerFactory() != null){
- this.runtimeFactory = new KubernetesRuntimeFactory(
- workerConfig.getKubernetesContainerFactory().getK8Uri(),
- workerConfig.getKubernetesContainerFactory().getJobNamespace(),
- workerConfig.getKubernetesContainerFactory().getPulsarDockerImageName(),
- workerConfig.getKubernetesContainerFactory().getImagePullPolicy(),
- workerConfig.getKubernetesContainerFactory().getPulsarRootDir(),
- workerConfig.getKubernetesContainerFactory().getSubmittingInsidePod(),
- workerConfig.getKubernetesContainerFactory().getInstallUserCodeDependencies(),
- workerConfig.getKubernetesContainerFactory().getPythonDependencyRepository(),
- workerConfig.getKubernetesContainerFactory().getPythonExtraDependencyRepository(),
- workerConfig.getKubernetesContainerFactory().getExtraFunctionDependenciesDir(),
- workerConfig.getKubernetesContainerFactory().getCustomLabels(),
- workerConfig.getKubernetesContainerFactory().getPercentMemoryPadding(),
- workerConfig.getKubernetesContainerFactory().getCpuOverCommitRatio(),
- workerConfig.getKubernetesContainerFactory().getMemoryOverCommitRatio(),
- StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl()) ? workerConfig.getPulsarServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarServiceUrl(),
- StringUtils.isEmpty(workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl()) ? workerConfig.getPulsarWebServiceUrl() : workerConfig.getKubernetesContainerFactory().getPulsarAdminUrl(),
- workerConfig.getStateStorageServiceUrl(),
- authConfig,
- workerConfig.getTlsTrustChainBytes(),
- workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
- workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
- workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
- workerConfig.getFunctionInstanceMinResources(),
- secretsProviderConfigurator,
- workerConfig.isAuthenticationEnabled(),
- functionAuthProvider);
+ // initialize function runtime factory
+ if (!StringUtils.isEmpty(workerConfig.getFunctionRuntimeFactoryClassName())) {
+ this.runtimeFactory = RuntimeFactory.getFuntionRuntimeFactory(workerConfig.getFunctionRuntimeFactoryClassName());
} else {
- throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
+ if (workerConfig.getThreadContainerFactory() != null) {
+ this.runtimeFactory = new ThreadRuntimeFactory();
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getThreadContainerFactory(), Map.class));
+ } else if (workerConfig.getProcessContainerFactory() != null) {
+ this.runtimeFactory = new ProcessRuntimeFactory();
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getProcessContainerFactory(), Map.class));
+ } else if (workerConfig.getKubernetesContainerFactory() != null) {
+ this.runtimeFactory = new KubernetesRuntimeFactory();
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(workerConfig.getKubernetesContainerFactory(), Map.class));
+ } else {
+ throw new RuntimeException("A Function Runtime Factory needs to be set");
+ }
}
+ // initialize runtime
+ this.runtimeFactory.initialize(workerConfig, authConfig, secretsProviderConfigurator, functionAuthProvider);
this.functionActioner = new FunctionActioner(this.workerConfig, runtimeFactory,
dlogNamespace, connectorsManager, workerService.getBrokerAdmin());
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
index cd186cc..327d694 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionsStatsGenerator.java
@@ -19,7 +19,7 @@
package org.apache.pulsar.functions.worker;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.slf4j.Logger;
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
index 2374dce..b3ab7bb 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionActionerTest.java
@@ -29,15 +29,18 @@ import static org.mockito.Mockito.verify;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.auth.FunctionAuthProvider;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.PackageLocationMetaData;
import org.apache.pulsar.functions.runtime.Runtime;
import org.apache.pulsar.functions.runtime.RuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.testng.annotations.Test;
+import java.util.Map;
import java.util.Optional;
import static org.apache.pulsar.common.functions.Utils.FILE;
@@ -60,8 +63,10 @@ public class FunctionActionerTest {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
- workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class)); workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -99,7 +104,10 @@ public class FunctionActionerTest {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -158,7 +166,10 @@ public class FunctionActionerTest {
public void testFunctionAuthDisabled() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index 9845555..77407a1 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -28,9 +28,14 @@ import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
-import org.apache.pulsar.functions.runtime.KubernetesRuntime;
-import org.apache.pulsar.functions.runtime.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntime;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactory;
+import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.kubernetes.KubernetesRuntimeFactoryConfig;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -40,6 +45,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import static org.mockito.Mockito.doNothing;
@@ -58,6 +64,7 @@ import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
@Slf4j
public class FunctionRuntimeManagerTest {
@@ -67,7 +74,10 @@ public class FunctionRuntimeManagerTest {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -147,7 +157,10 @@ public class FunctionRuntimeManagerTest {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
@@ -229,7 +242,10 @@ public class FunctionRuntimeManagerTest {
public void testProcessAssignmentUpdateModifyFunctions() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
@@ -355,7 +371,10 @@ public class FunctionRuntimeManagerTest {
public void testReassignment() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
@@ -456,7 +475,10 @@ public class FunctionRuntimeManagerTest {
public void testRuntimeManagerInitialize() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");
@@ -524,7 +546,6 @@ public class FunctionRuntimeManagerTest {
}
});
-
when(reader.hasMessageAvailable()).thenAnswer(new Answer<Boolean>() {
@Override
public Boolean answer(InvocationOnMock invocationOnMock) throws Throwable {
@@ -532,7 +553,6 @@ public class FunctionRuntimeManagerTest {
}
});
-
ReaderBuilder readerBuilder = mock(ReaderBuilder.class);
doReturn(readerBuilder).when(pulsarClient).newReader();
doReturn(readerBuilder).when(readerBuilder).topic(anyString());
@@ -581,9 +601,11 @@ public class FunctionRuntimeManagerTest {
public void testExternallyManagedRuntimeUpdate() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setKubernetesContainerFactory(
- new WorkerConfig.KubernetesContainerFactory()
- .setSubmittingInsidePod(false));
+ workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal()
+ .convertValue(new KubernetesRuntimeFactoryConfig()
+ .setSubmittingInsidePod(false), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setPulsarFunctionsCluster("cluster");
@@ -703,4 +725,174 @@ public class FunctionRuntimeManagerTest {
verify(kubernetesRuntime, times(1)).reinitialize();
}
+
+ @Test
+ public void testFunctionRuntimeSetCorrectly() {
+
+ // Function runtime not set
+ try {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ fail();
+ } catch (Exception e) {
+ assertEquals(e.getMessage(), "A Function Runtime Factory needs to be set");
+ }
+
+ // Function runtime class not found
+ try {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName("foo");
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ fail();
+ } catch (Exception e) {
+ assertEquals(e.getCause().getClass(), ClassNotFoundException.class);
+ }
+
+ // Function runtime class does not implement correct interface
+ try {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(FunctionRuntimeManagerTest.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ fail();
+ } catch (Exception e) {
+ assertEquals(e.getMessage(), "org.apache.pulsar.functions.worker.FunctionRuntimeManagerTest does not implement org.apache.pulsar.functions.runtime.RuntimeFactory");
+ }
+
+ // Correct runtime class
+ try {
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setWorkerId("worker-1");
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(new KubernetesRuntimeFactoryConfig(), Map.class));
+ workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
+ workerConfig.setStateStorageServiceUrl("foo");
+ workerConfig.setFunctionAssignmentTopicName("assignments");
+ FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
+ } catch (Exception e) {
+ fail();
+ }
+ }
+
+ @Test
+ public void testFunctionRuntimeFactoryConfigsBackwardsCompatibility() throws Exception {
+
+ // Test kubernetes runtime
+ WorkerConfig.KubernetesContainerFactory kubernetesContainerFactory
+ = new WorkerConfig.KubernetesContainerFactory();
+ kubernetesContainerFactory.setK8Uri("k8Uri");
+ kubernetesContainerFactory.setJobNamespace("jobNamespace");
+ kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
+ kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
+ kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");
+ WorkerConfig workerConfig = new WorkerConfig();
+ workerConfig.setKubernetesContainerFactory(kubernetesContainerFactory);
+
+ FunctionRuntimeManager functionRuntimeManager = new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), KubernetesRuntimeFactory.class);
+ KubernetesRuntimeFactory kubernetesRuntimeFactory = (KubernetesRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+ assertEquals(kubernetesRuntimeFactory.getK8Uri(), "k8Uri");
+ assertEquals(kubernetesRuntimeFactory.getJobNamespace(), "jobNamespace");
+ assertEquals(kubernetesRuntimeFactory.getPulsarDockerImageName(), "pulsarDockerImageName");
+ assertEquals(kubernetesRuntimeFactory.getImagePullPolicy(), "imagePullPolicy");
+ assertEquals(kubernetesRuntimeFactory.getPulsarRootDir(), "pulsarRootDir");
+
+ // Test process runtime
+
+ WorkerConfig.ProcessContainerFactory processContainerFactory
+ = new WorkerConfig.ProcessContainerFactory();
+ processContainerFactory.setExtraFunctionDependenciesDir("extraDependenciesDir");
+ processContainerFactory.setLogDirectory("logDirectory");
+ processContainerFactory.setPythonInstanceLocation("pythonInstanceLocation");
+ processContainerFactory.setJavaInstanceJarLocation("javaInstanceJarLocation");
+ workerConfig = new WorkerConfig();
+ workerConfig.setProcessContainerFactory(processContainerFactory);
+
+ functionRuntimeManager = new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ProcessRuntimeFactory.class);
+ ProcessRuntimeFactory processRuntimeFactory = (ProcessRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+ assertEquals(processRuntimeFactory.getExtraDependenciesDir(), "extraDependenciesDir");
+ assertEquals(processRuntimeFactory.getLogDirectory(), "logDirectory/functions");
+ assertEquals(processRuntimeFactory.getPythonInstanceFile(), "pythonInstanceLocation");
+ assertEquals(processRuntimeFactory.getJavaInstanceJarFile(), "javaInstanceJarLocation");
+
+ // Test thread runtime
+
+ WorkerConfig.ThreadContainerFactory threadContainerFactory
+ = new WorkerConfig.ThreadContainerFactory();
+ threadContainerFactory.setThreadGroupName("threadGroupName");
+ workerConfig = new WorkerConfig();
+ workerConfig.setThreadContainerFactory(threadContainerFactory);
+
+ functionRuntimeManager = new FunctionRuntimeManager(
+ workerConfig,
+ mock(WorkerService.class),
+ mock(Namespace.class),
+ mock(MembershipManager.class),
+ mock(ConnectorsManager.class),
+ mock(FunctionMetaDataManager.class));
+
+ assertEquals(functionRuntimeManager.getRuntimeFactory().getClass(), ThreadRuntimeFactory.class);
+ ThreadRuntimeFactory threadRuntimeFactory = (ThreadRuntimeFactory) functionRuntimeManager.getRuntimeFactory();
+ assertEquals(threadRuntimeFactory.getThreadGroup().getName(), "threadGroupName");
+ }
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
index 57eed32..2d0460b 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/MembershipManagerTest.java
@@ -34,6 +34,7 @@ import static org.testng.Assert.assertTrue;
import java.util.LinkedList;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.distributedlog.api.namespace.Namespace;
@@ -48,7 +49,10 @@ import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.mockito.Mockito;
import org.testng.annotations.Test;
@@ -59,7 +63,10 @@ public class MembershipManagerTest {
public MembershipManagerTest() {
this.workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
}
@@ -278,7 +285,10 @@ public class MembershipManagerTest {
public void testCheckFailuresSomeUnassigned() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000);
@@ -354,7 +364,10 @@ public class MembershipManagerTest {
public void testHeartBeatFunctionWorkerDown() throws Exception {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setRescheduleTimeoutMs(30000);
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index c0444e3..6ee14dc 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -30,9 +30,11 @@ import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.common.functions.WorkerInfo;
+import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
-import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactoryConfig;
+import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
@@ -86,7 +88,10 @@ public class SchedulerManagerTest {
public void setup() {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
- workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
+ workerConfig.setFunctionRuntimeFactoryClassName(ThreadRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(
+ new ThreadRuntimeFactoryConfig().setThreadGroupName("test"), Map.class));
workerConfig.setPulsarServiceUrl("pulsar://localhost:6650");
workerConfig.setStateStorageServiceUrl("foo");
workerConfig.setFunctionAssignmentTopicName("assignments");