You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/31 00:35:48 UTC
[pulsar] branch master updated: Hooked up secrets function api with
secret function implementations (#2875)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 649c6ca Hooked up secrets function api with secret function implementations (#2875)
649c6ca is described below
commit 649c6ca4eb83d340edfd4b431e0e4e68099d95f6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Oct 30 17:35:44 2018 -0700
Hooked up secrets function api with secret function implementations (#2875)
* Hooked up secrets function api with secret function implementations
* Fixed unittest
* Added more docs
* Took feedback into account
---
.../python/pulsar/functions/context.py | 5 ++
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 3 +-
.../org/apache/pulsar/functions/api/Context.java | 7 +++
pulsar-functions/instance/pom.xml | 6 +++
.../pulsar/functions/instance/ContextImpl.java | 24 ++++++++-
.../functions/instance/JavaInstanceRunnable.java | 9 +++-
.../instance/src/main/python/contextimpl.py | 11 +++-
.../instance/src/main/python/python_instance.py | 5 +-
.../src/main/python/python_instance_main.py | 19 ++++++-
.../instance/src/main/python/secretsprovider.py | 61 +++++++++++++++++++++
.../pulsar/functions/instance/ContextImplTest.java | 4 +-
.../instance/JavaInstanceRunnableTest.java | 2 +-
.../src/test/python/test_python_instance.py | 2 +-
..._python_instance.py => test_secretsprovider.py} | 48 ++++++++---------
pulsar-functions/runtime/pom.xml | 6 +++
.../pulsar/functions/runtime/JavaInstanceMain.java | 35 +++++++++++-
.../functions/runtime/KubernetesRuntime.java | 24 ++++++++-
.../runtime/KubernetesRuntimeFactory.java | 19 ++++++-
.../pulsar/functions/runtime/ProcessRuntime.java | 15 +++++-
.../functions/runtime/ProcessRuntimeFactory.java | 7 ++-
.../pulsar/functions/runtime/RuntimeUtils.java | 10 ++++
.../pulsar/functions/runtime/ThreadRuntime.java | 7 ++-
.../functions/runtime/ThreadRuntimeFactory.java | 14 +++--
.../functions/runtime/KubernetesRuntimeTest.java | 15 +++---
.../functions/runtime/ProcessRuntimeTest.java | 63 ++++++++++++++++++++--
.../functions/worker/FunctionRuntimeManager.java | 21 ++++++--
.../pulsar/functions/worker/WorkerConfig.java | 8 ++-
.../functions/worker/SchedulerManagerTest.java | 21 ++++----
28 files changed, 394 insertions(+), 77 deletions(-)
diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 47e86f9..6575f7a 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -99,6 +99,11 @@ class Context(object):
pass
@abstractmethod
+ def get_secret(self, secret_name):
+ """Returns the secret value associated with the name. None if nothing was found"""
+ pass
+
+ @abstractmethod
def record_metric(self, metric_name, metric_value):
"""Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
pass
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a75092c..3169e22 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.*;
import org.apache.pulsar.functions.windowing.WindowUtils;
@@ -1042,7 +1043,7 @@ public class CmdFunctions extends CmdBase {
}
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
- null)) {
+ null, new DefaultSecretsProviderConfigurator())) {
List<RuntimeSpawner> spawners = new LinkedList<>();
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index c66ea6e..e9b79c3 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -160,6 +160,13 @@ public interface Context {
Object getUserConfigValueOrDefault(String key, Object defaultValue);
/**
+ * Get the secret associated with this key
+ * @param secretName The name of the secret
+ * @return The secret if anything was found or null
+ */
+ String getSecret(String secretName);
+
+ /**
* Record a user defined metric
* @param metricName The name of the metric
* @param value The value of the metric
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 1c8f3ab..8dec8dc 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -65,6 +65,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-secrets</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>pulsar-client-original</artifactId>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 4d47433..406fe13 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.source.TopicSchema;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.SourceContext;
@@ -101,12 +102,16 @@ class ContextImpl implements Context, SinkContext, SourceContext {
private final TopicSchema topicSchema;
+ private final SecretsProvider secretsProvider;
+ private final Map<String, Object> secretsMap;
+
@Getter
@Setter
private StateContextImpl stateContext;
private Map<String, Object> userConfigs;
- public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics) {
+ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
+ SecretsProvider secretsProvider) {
this.config = config;
this.logger = logger;
this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
@@ -125,6 +130,14 @@ class ContextImpl implements Context, SinkContext, SourceContext {
new TypeToken<Map<String, Object>>() {
}.getType());
}
+ this.secretsProvider = secretsProvider;
+ if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) {
+ secretsMap = new Gson().fromJson(config.getFunctionDetails().getSecretsMap(),
+ new TypeToken<Map<String, Object>>() {
+ }.getType());
+ } else {
+ secretsMap = new HashMap<>();
+ }
}
public void setCurrentMessageContext(Record<?> record) {
@@ -212,6 +225,15 @@ class ContextImpl implements Context, SinkContext, SourceContext {
return userConfigs;
}
+ @Override
+ public String getSecret(String secretName) {
+ if (secretsMap.containsKey(secretName)) {
+ return secretsProvider.provideSecret(secretName, secretsMap.get(secretName));
+ } else {
+ return null;
+ }
+ }
+
private void ensureStateEnabled() {
checkState(null != stateContext, "State is not enabled.");
}
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ace5efc..f626c10 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
import org.apache.pulsar.functions.proto.Function.SourceSpec;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.sink.PulsarSink;
import org.apache.pulsar.functions.sink.PulsarSinkConfig;
import org.apache.pulsar.functions.sink.PulsarSinkDisable;
@@ -110,6 +111,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
private Source source;
private Sink sink;
+ private final SecretsProvider secretsProvider;
+
public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__";
public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__";
@@ -122,13 +125,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
FunctionCacheManager fnCache,
String jarFile,
PulsarClient pulsarClient,
- String stateStorageServiceUrl) {
+ String stateStorageServiceUrl,
+ SecretsProvider secretsProvider) {
this.instanceConfig = instanceConfig;
this.fnCache = fnCache;
this.jarFile = jarFile;
this.client = (PulsarClientImpl) pulsarClient;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.stats = new FunctionStats();
+ this.secretsProvider = secretsProvider;
}
/**
@@ -173,7 +178,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
}
Logger instanceLog = LoggerFactory.getLogger(
"function-" + instanceConfig.getFunctionDetails().getName());
- return new ContextImpl(instanceConfig, instanceLog, client, inputTopics);
+ return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider);
}
/**
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 24246ca..83a63aa 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -48,12 +48,13 @@ class AccumulatedMetricDatum(object):
self.min = value
class ContextImpl(pulsar.Context):
- def __init__(self, instance_config, logger, pulsar_client, user_code, consumers):
+ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider):
self.instance_config = instance_config
self.log = logger
self.pulsar_client = pulsar_client
self.user_code_dir = os.path.dirname(user_code)
self.consumers = consumers
+ self.secrets_provider = secrets_provider
self.current_accumulated_metrics = {}
self.accumulated_metrics = {}
self.publish_producers = {}
@@ -64,6 +65,9 @@ class ContextImpl(pulsar.Context):
self.user_config = json.loads(instance_config.function_details.userConfig) \
if instance_config.function_details.userConfig \
else []
+ self.secrets_map = json.loads(instance_config.function_details.secretsMap) \
+ if instance_config.function_details.secretsMap \
+ else {}
# Called on a per message basis to set the context for the current message
def set_current_message_context(self, msgid, topic):
@@ -107,6 +111,11 @@ class ContextImpl(pulsar.Context):
def get_user_config_map(self):
return self.user_config
+ def get_secret(self, secret_key):
+ if not secret_key in self.secrets_map:
+ return None
+ return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key])
+
def record_metric(self, metric_name, metric_value):
if not metric_name in self.current_accumulated_metrics:
self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 4b9ae3a..2438338 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -132,7 +132,7 @@ class Stats(object):
class PythonInstance(object):
- def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
+ def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider):
self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
self.user_code = user_code
self.queue = queue.Queue(max_buffered_tuples)
@@ -157,6 +157,7 @@ class PythonInstance(object):
self.last_health_check_ts = time.time()
self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
self.expected_healthcheck_interval = expected_healthcheck_interval
+ self.secrets_provider = secrets_provider
def health_check(self):
self.last_health_check_ts = time.time()
@@ -226,7 +227,7 @@ class PythonInstance(object):
except:
self.function_purefunction = function_kclass
- self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers)
+ self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
# Now launch a thread that does execution
self.exeuction_thread = threading.Thread(target=self.actual_execution)
self.exeuction_thread.start()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 2acfd6e..748923e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -29,6 +29,8 @@ import sys
import signal
import time
import zipfile
+import json
+import inspect
import pulsar
@@ -72,11 +74,12 @@ def main():
parser.add_argument('--logging_file', required=True, help='Log file name')
parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
+ parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider')
+ parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider')
parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
-
args = parser.parse_args()
function_details = Function_pb2.FunctionDetails()
args.function_details = str(args.function_details)
@@ -120,11 +123,23 @@ def main():
if args.tls_trust_cert_path:
tls_trust_cert_path = args.tls_trust_cert_path
pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
+
+ secrets_provider = None
+ if args.secrets_provider is not None:
+ secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), str(args.secrets_provider))
+ else:
+ secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), "secretsprovider.ClearTextSecretsProvider")
+ secrets_provider = secrets_provider()
+ secrets_provider_config = None
+ if args.secrets_provider_config is not None:
+ secrets_provider_config = json.loads(str(args.secrets_provider_config))
+ secrets_provider.init(secrets_provider_config)
+
pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
str(args.function_version), function_details,
int(args.max_buffered_tuples),
int(args.expected_healthcheck_interval),
- str(args.py), pulsar_client)
+ str(args.py), pulsar_client, secrets_provider)
pyinstance.run()
server_instance = server.serve(args.port, pyinstance)
diff --git a/pulsar-functions/instance/src/main/python/secretsprovider.py b/pulsar-functions/instance/src/main/python/secretsprovider.py
new file mode 100644
index 0000000..db8e68c
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/secretsprovider.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""secretsprovider.py: Interfaces and definitions for Secret Providers
+"""
+from abc import abstractmethod
+import os
+
+class SecretsProvider:
+ """Interface for providing secrets information runtime"""
+ @abstractmethod
+ def init(self, config):
+ """Do any kind of initialization"""
+ pass
+
+ @abstractmethod
+ def provide_secret(self, secret_name, path_to_secret):
+ """Fetches the secret located at the path"""
+ pass
+
+
+"""A simple implementation that represents storing secrets in clear text """
+class ClearTextSecretsProvider(SecretsProvider):
+ def __init__(self):
+ pass
+
+ def init(self, config):
+ pass
+
+ def provide_secret(self, secret_name, path_to_secret):
+ return path_to_secret
+
+"""Implementation that fetches secrets from environment variables"""
+class EnvironmentBasedSecretsProvider(SecretsProvider):
+ def __init__(self):
+ pass
+
+ def init(self, config):
+ pass
+
+ def provide_secret(self, secret_name, path_to_secret):
+ return os.environ.get(secret_name)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index f5108fc..e3e32fd 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
import org.apache.pulsar.functions.instance.state.StateContextImpl;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Matchers;
@@ -73,7 +74,8 @@ public class ContextImplTest {
config,
logger,
client,
- new ArrayList<>()
+ new ArrayList<>(),
+ new EnvironmentBasedSecretsProvider()
);
}
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 12d4f19..80b3b1d 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ public class JavaInstanceRunnableTest {
private JavaInstanceRunnable createRunnable(boolean addCustom, String outputSerde) throws Exception {
InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
- config, null, null, null, null);
+ config, null, null, null, null, null);
return javaInstanceRunnable;
}
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 0b5355f..748e5d8 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase):
pulsar_client.create_producer = Mock(return_value=producer)
user_code=__file__
consumers = None
- context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers)
+ context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None)
context_impl.publish("test_topic_name", "test_message")
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
similarity index 50%
copy from pulsar-functions/instance/src/test/python/test_python_instance.py
copy to pulsar-functions/instance/src/test/python/test_secretsprovider.py
index 0b5355f..5d725ee 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
@@ -20,11 +20,9 @@
# DEPENDENCIES: unittest2,mock
-from contextimpl import ContextImpl
-from python_instance import InstanceConfig
-from mock import Mock
+from secretsprovider import ClearTextSecretsProvider
+from secretsprovider import EnvironmentBasedSecretsProvider
-import Function_pb2
import log
import os
import unittest
@@ -34,25 +32,23 @@ class TestContextImpl(unittest.TestCase):
def setUp(self):
log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini")
- def test_context_publish(self):
- instance_id = 'test_instance_id'
- function_id = 'test_function_id'
- function_version = 'test_function_version'
- function_details = Function_pb2.FunctionDetails()
- max_buffered_tuples = 100;
- instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
- logger = log.Log
- pulsar_client = Mock()
- producer = Mock()
- producer.send_async = Mock(return_value=None)
- pulsar_client.create_producer = Mock(return_value=producer)
- user_code=__file__
- consumers = None
- context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers)
-
- context_impl.publish("test_topic_name", "test_message")
-
- producer.send_async.assert_called_with("test_message", None, properties=None)
-
-
-
+ def test_cleartext_secretsprovider(self):
+ provider = ClearTextSecretsProvider()
+ secret = provider.provide_secret("secretName", "secretPath")
+ self.assertEqual(secret, "secretPath")
+ secret = provider.provide_secret("secretName", "")
+ self.assertEqual(secret, "")
+ secret = provider.provide_secret("secretName", None)
+ self.assertEqual(secret, None)
+
+ def test_environment_secretsprovider(self):
+ provider = EnvironmentBasedSecretsProvider()
+ secret = provider.provide_secret("secretName", "secretPath")
+ self.assertEqual(secret, None)
+ os.environ["secretName"] = "secretValue"
+ secret = provider.provide_secret("secretName", "")
+ self.assertEqual(secret, "secretValue")
+ secret = provider.provide_secret("secretName", None)
+ self.assertEqual(secret, "secretValue")
+ secret = provider.provide_secret("secretName", "somethingelse")
+ self.assertEqual(secret, "secretValue")
\ No newline at end of file
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 66d245f..e0b264d 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -46,6 +46,12 @@
</dependency>
<dependency>
+ <groupId>org.apache.pulsar</groupId>
+ <artifactId>pulsar-functions-secrets</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-all</artifactId>
</dependency>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f..80503bb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,18 +22,26 @@ package org.apache.pulsar.functions.runtime;
import com.beust.jcommander.JCommander;
import com.beust.jcommander.Parameter;
import com.beust.jcommander.converters.StringConverter;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.Reflections;
+import java.lang.reflect.Type;
+import java.util.Map;
import java.util.TimerTask;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
@@ -95,6 +103,12 @@ public class JavaInstanceMain implements AutoCloseable {
@Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
protected int expectedHealthCheckInterval;
+ @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
+ protected String secretsProviderClassName;
+
+ @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
+ protected String secretsProviderConfig;
+
private Server server;
private RuntimeSpawner runtimeSpawner;
private ThreadRuntimeFactory containerFactory;
@@ -122,13 +136,32 @@ public class JavaInstanceMain implements AutoCloseable {
instanceConfig.setFunctionDetails(functionDetails);
instanceConfig.setPort(port);
+ Map<String, String> secretsProviderConfigMap = null;
+ if (!StringUtils.isEmpty(secretsProviderConfig)) {
+ Type type = new TypeToken<Map<String, String>>() {}.getType();
+ secretsProviderConfigMap = new Gson().fromJson(secretsProviderConfig, type);
+ }
+
+ if (StringUtils.isEmpty(secretsProviderClassName)) {
+ secretsProviderClassName = ClearTextSecretsProvider.class.getName();
+ }
+
+ SecretsProvider secretsProvider;
+ try {
+ secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ secretsProvider.init(secretsProviderConfigMap);
+
containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
stateStorageServiceUrl,
AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
.clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
.tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
.tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
- .tlsTrustCertsFilePath(tlsTrustCertFilePath).build());
+ .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+ secretsProvider);
runtimeSpawner = new RuntimeSpawner(
instanceConfig,
jarFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 7f3f72d..152842e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -22,6 +22,8 @@ package org.apache.pulsar.functions.runtime;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.Empty;
import com.google.protobuf.util.JsonFormat;
import com.squareup.okhttp.Response;
@@ -33,6 +35,7 @@ import io.kubernetes.client.custom.Quantity;
import io.kubernetes.client.models.*;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
@@ -40,7 +43,10 @@ import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import java.lang.reflect.Type;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -97,6 +103,7 @@ class KubernetesRuntime implements Runtime {
private final String userCodePkgUrl;
private final String originalCodeFileName;
private final String pulsarAdminUrl;
+ private final SecretsProviderConfigurator secretsProviderConfigurator;
private boolean running;
@@ -119,6 +126,7 @@ class KubernetesRuntime implements Runtime {
String pulsarAdminUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
Integer expectedMetricsInterval) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
@@ -130,7 +138,13 @@ class KubernetesRuntime implements Runtime {
this.userCodePkgUrl = userCodePkgUrl;
this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
+ this.secretsProviderConfigurator = secretsProviderConfigurator;
String logConfigFile = null;
+ String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+ String secretsProviderConfig = null;
+ if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+ secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+ }
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
logConfigFile = "kubernetes_instance_log4j2.yml";
@@ -141,7 +155,7 @@ class KubernetesRuntime implements Runtime {
}
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
- installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
+ secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
running = false;
doChecks(instanceConfig.getFunctionDetails());
@@ -156,6 +170,10 @@ class KubernetesRuntime implements Runtime {
try {
submitStatefulSet();
} catch (Exception e) {
+ log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
+ instanceConfig.getFunctionDetails().getTenant(),
+ instanceConfig.getFunctionDetails().getNamespace(),
+ instanceConfig.getFunctionDetails().getName(), e);
deleteService();
}
running = true;
@@ -536,8 +554,10 @@ class KubernetesRuntime implements Runtime {
.valueFrom(new V1EnvVarSource()
.fieldRef(new V1ObjectFieldSelector()
.fieldPath("metadata.name")));
- container.setEnv(Arrays.asList(envVarPodName));
+ container.addEnvItem(envVarPodName);
+ // Configure secrets
+ secretsProviderConfigurator.configureKubernetesRuntimeSecretsProvider(container, instanceConfig.getFunctionDetails());
// set container resources
final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index c6d5d02..2adbb5e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -20,6 +20,8 @@
package org.apache.pulsar.functions.runtime;
import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
import io.kubernetes.client.ApiClient;
import io.kubernetes.client.Configuration;
import io.kubernetes.client.apis.AppsV1Api;
@@ -30,11 +32,14 @@ import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import java.lang.reflect.Field;
+import java.lang.reflect.Type;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
@@ -72,6 +77,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private final String javaInstanceJarFile;
private final String pythonInstanceFile;
private final String prometheusMetricsServerJarFile;
+ private final SecretsProviderConfigurator secretsProviderConfigurator;
private final String logDirectory = "logs/functions";
private Timer changeConfigMapTimer;
private AppsV1Api appsClient;
@@ -93,7 +99,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
AuthenticationConfig authConfig,
Integer expectedMetricsCollectionInterval,
String changeConfigMap,
- String changeConfigMapNamespace) {
+ String changeConfigMapNamespace,
+ SecretsProviderConfigurator secretsProviderConfigurator) {
this.kubernetesInfo = new KubernetesInfo();
this.kubernetesInfo.setK8Uri(k8Uri);
if (!isEmpty(jobNamespace)) {
@@ -126,6 +133,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
+ this.secretsProviderConfigurator = secretsProviderConfigurator;
}
@Override
@@ -169,7 +177,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
this.kubernetesInfo.getPulsarAdminUrl(),
stateStorageServiceUri,
authConfig,
- expectedMetricsCollectionInterval);
+ secretsProviderConfigurator,
+ expectedMetricsCollectionInterval);
}
@Override
@@ -179,6 +188,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
@Override
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
KubernetesRuntime.doChecks(functionDetails);
+ if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+ Type type = new TypeToken<Map<String, Object>>() {
+ }.getType();
+ Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
+ secretsProviderConfigurator.validateSecretMap(secretsMap);
+ }
}
@VisibleForTesting
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2fa7f82..76db513 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.runtime;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
import com.google.protobuf.Empty;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
@@ -29,9 +30,12 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import java.io.InputStream;
import java.util.List;
@@ -60,6 +64,7 @@ class ProcessRuntime implements Runtime {
private ScheduledExecutorService timer;
private InstanceConfig instanceConfig;
private final Long expectedHealthCheckInterval;
+ private final SecretsProviderConfigurator secretsProviderConfigurator;
private static final long GRPC_TIMEOUT_SECS = 5;
ProcessRuntime(InstanceConfig instanceConfig,
@@ -69,11 +74,18 @@ class ProcessRuntime implements Runtime {
String pulsarServiceUrl,
String stateStorageServiceUrl,
AuthenticationConfig authConfig,
+ SecretsProviderConfigurator secretsProviderConfigurator,
Long expectedHealthCheckInterval) throws Exception {
this.instanceConfig = instanceConfig;
this.instancePort = instanceConfig.getPort();
this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+ this.secretsProviderConfigurator = secretsProviderConfigurator;
String logConfigFile = null;
+ String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+ String secretsProviderConfig = null;
+ if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+ secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+ }
switch (instanceConfig.getFunctionDetails().getRuntime()) {
case JAVA:
logConfigFile = "java_instance_log4j2.yml";
@@ -84,7 +96,7 @@ class ProcessRuntime implements Runtime {
}
this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
- logConfigFile, false, null, null);
+ logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null);
}
/**
@@ -258,6 +270,7 @@ class ProcessRuntime implements Runtime {
deathException = null;
try {
ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
+ secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails());
log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
process = processBuilder.start();
} catch (Exception ex) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 78b069c..41a28fb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
import java.nio.file.Paths;
@@ -37,6 +38,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
private final String pulsarServiceUrl;
private final String stateStorageServiceUrl;
private AuthenticationConfig authConfig;
+ private SecretsProviderConfigurator secretsProviderConfigurator;
private String javaInstanceJarFile;
private String pythonInstanceFile;
private String logDirectory;
@@ -47,10 +49,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
AuthenticationConfig authConfig,
String javaInstanceJarFile,
String pythonInstanceFile,
- String logDirectory) {
+ String logDirectory,
+ SecretsProviderConfigurator secretsProviderConfigurator) {
this.pulsarServiceUrl = pulsarServiceUrl;
this.stateStorageServiceUrl = stateStorageServiceUrl;
this.authConfig = authConfig;
+ this.secretsProviderConfigurator = secretsProviderConfigurator;
this.javaInstanceJarFile = javaInstanceJarFile;
this.pythonInstanceFile = pythonInstanceFile;
this.logDirectory = logDirectory;
@@ -113,6 +117,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
pulsarServiceUrl,
stateStorageServiceUrl,
authConfig,
+ secretsProviderConfigurator,
expectedHealthCheckInterval);
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 2647536..f12222e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
import com.google.protobuf.util.JsonFormat;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
@@ -49,6 +50,8 @@ class RuntimeUtils {
Integer grpcPort,
Long expectedHealthCheckInterval,
String logConfigFile,
+ String secretsProviderClassName,
+ String secretsProviderConfig,
Boolean installUserCodeDepdendencies,
String pythonDependencyRepository,
String pythonExtraDependencyRepository) throws Exception {
@@ -150,6 +153,13 @@ class RuntimeUtils {
}
args.add("--expected_healthcheck_interval");
args.add(String.valueOf(expectedHealthCheckInterval));
+
+ args.add("--secrets_provider");
+ args.add(secretsProviderClassName);
+ if (!StringUtils.isEmpty(secretsProviderConfig)) {
+ args.add("--secrets_provider_config");
+ args.add(secretsProviderConfig);
+ }
return args;
}
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 9dafbe9..5e42c52 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
@@ -51,7 +52,8 @@ class ThreadRuntime implements Runtime {
ThreadGroup threadGroup,
String jarFile,
PulsarClient pulsarClient,
- String stateStorageServiceUrl) {
+ String stateStorageServiceUrl,
+ SecretsProvider secretsProvider) {
this.instanceConfig = instanceConfig;
if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
throw new RuntimeException("Thread Container only supports Java Runtime");
@@ -61,7 +63,8 @@ class ThreadRuntime implements Runtime {
fnCache,
jarFile,
pulsarClient,
- stateStorageServiceUrl);
+ stateStorageServiceUrl,
+ secretsProvider);
this.threadGroup = threadGroup;
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index dfbbb64..846028d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -28,9 +28,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
@@ -44,15 +44,18 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private final FunctionCacheManager fnCache;
private final PulsarClient pulsarClient;
private final String storageServiceUrl;
+ private final SecretsProvider secretsProvider;
private volatile boolean closed;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
- AuthenticationConfig authConfig) throws Exception {
- this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl);
+ AuthenticationConfig authConfig, SecretsProvider secretsProvider) throws Exception {
+ this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider);
}
@VisibleForTesting
- public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
+ public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
+ SecretsProvider secretsProvider) {
+ this.secretsProvider = secretsProvider;
this.fnCache = new FunctionCacheManagerImpl();
this.threadGroup = new ThreadGroup(threadGroupName);
this.pulsarClient = pulsarClient;
@@ -90,7 +93,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
threadGroup,
jarFile,
pulsarClient,
- storageServiceUrl);
+ storageServiceUrl,
+ secretsProvider);
}
@Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 0288660..5c05de2 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -73,8 +74,8 @@ public class KubernetesRuntimeTest {
this.logDirectory = "logs/functions";
this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
false, true, "myrepo", "anotherrepo",
- null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null,
- null, null, null));
+ null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null,
+ null, null, new DefaultSecretsProviderConfigurator()));
doNothing().when(this.factory).setupClient();
}
@@ -123,7 +124,7 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 28);
+ assertEquals(args.size(), 30);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
@@ -137,7 +138,8 @@ public class KubernetesRuntimeTest {
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
- + " --expected_healthcheck_interval -1";
+ + " --expected_healthcheck_interval -1"
+ + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -147,7 +149,7 @@ public class KubernetesRuntimeTest {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 32);
+ assertEquals(args.size(), 34);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + pulsarRootDir + "/" + userJarFile
+ " --logging_directory " + logDirectory
@@ -162,7 +164,8 @@ public class KubernetesRuntimeTest {
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(29)
- + " --expected_healthcheck_interval -1";
+ + " --expected_healthcheck_interval -1"
+ + " --secrets_provider secretsprovider.ClearTextSecretsProvider";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index fe492b8..c2f3638 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,16 +21,22 @@ package org.apache.pulsar.functions.runtime;
import static org.testng.Assert.assertEquals;
+import com.google.gson.reflect.TypeToken;
import com.google.protobuf.util.JsonFormat;
+import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import io.kubernetes.client.models.V1Container;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -40,6 +46,48 @@ import org.testng.annotations.Test;
*/
public class ProcessRuntimeTest {
+ class TestSecretsProviderConfigurator implements SecretsProviderConfigurator {
+
+ @Override
+ public void init(Map<String, String> config) {
+
+ }
+
+ @Override
+ public String getSecretsProviderClassName(FunctionDetails functionDetails) {
+ if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) {
+ return ClearTextSecretsProvider.class.getName();
+ } else {
+ return "secretsprovider.ClearTextSecretsProvider";
+ }
+ }
+
+ @Override
+ public Map<String, String> getSecretsProviderConfig(FunctionDetails functionDetails) {
+ Map<String, String> config = new HashMap<>();
+ config.put("Config", "Value");
+ return config;
+ }
+
+ @Override
+ public void configureKubernetesRuntimeSecretsProvider(V1Container container, FunctionDetails functionDetails) {
+ }
+
+ @Override
+ public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, FunctionDetails functionDetails) {
+ }
+
+ @Override
+ public Type getSecretObjectType() {
+ return TypeToken.get(String.class).getType();
+ }
+
+ @Override
+ public void validateSecretMap(Map<String, Object> secretMap) {
+
+ }
+ }
+
private static final String TEST_TENANT = "test-function-tenant";
private static final String TEST_NAMESPACE = "test-function-namespace";
private static final String TEST_NAME = "test-function-container";
@@ -67,7 +115,8 @@ public class ProcessRuntimeTest {
this.stateStorageServiceUrl = "bk://localhost:4181";
this.logDirectory = "Users/user/logs";
this.factory = new ProcessRuntimeFactory(
- pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory);
+ pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory,
+ new TestSecretsProviderConfigurator());
}
@AfterMethod
@@ -115,7 +164,7 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 28);
+ assertEquals(args.size(), 32);
String expectedArgs = "java -cp " + javaInstanceJarFile
+ " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
+ " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +178,9 @@ public class ProcessRuntimeTest {
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
+ " --state_storage_serviceurl " + stateStorageServiceUrl
- + " --expected_healthcheck_interval 30";
+ + " --expected_healthcheck_interval 30"
+ + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ + " --secrets_provider_config {\"Config\":\"Value\"}";
assertEquals(String.join(" ", args), expectedArgs);
}
@@ -139,7 +190,7 @@ public class ProcessRuntimeTest {
ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
List<String> args = container.getProcessArgs();
- assertEquals(args.size(), 26);
+ assertEquals(args.size(), 30);
String expectedArgs = "python " + pythonInstanceFile
+ " --py " + userJarFile + " --logging_directory "
+ logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
@@ -149,7 +200,9 @@ public class ProcessRuntimeTest {
+ " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ "' --pulsar_serviceurl " + pulsarServiceUrl
+ " --max_buffered_tuples 1024 --port " + args.get(23)
- + " --expected_healthcheck_interval 30";
+ + " --expected_healthcheck_interval 30"
+ + " --secrets_provider secretsprovider.ClearTextSecretsProvider"
+ + " --secrets_provider_config {\"Config\":\"Value\"}";
assertEquals(String.join(" ", args), expectedArgs);
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index d8bf252..100c7c8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -54,6 +54,10 @@ import com.google.common.annotations.VisibleForTesting;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
/**
* This class managers all aspects of functions assignments and running of function assignments for this worker
@@ -104,6 +108,14 @@ public class FunctionRuntimeManager implements AutoCloseable{
this.workerService = workerService;
this.functionAdmin = workerService.getFunctionAdmin();
+ SecretsProviderConfigurator secretsProviderConfigurator;
+ if (!StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName())) {
+ secretsProviderConfigurator = (SecretsProviderConfigurator) Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader());
+ } else {
+ secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
+ }
+ secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
+
AuthenticationConfig authConfig = AuthenticationConfig.builder()
.clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
.clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
@@ -116,7 +128,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
workerConfig.getThreadContainerFactory().getThreadGroupName(),
workerConfig.getPulsarServiceUrl(),
workerConfig.getStateStorageServiceUrl(),
- authConfig);
+ authConfig,
+ new ClearTextSecretsProvider());
} else if (workerConfig.getProcessContainerFactory() != null) {
this.runtimeFactory = new ProcessRuntimeFactory(
workerConfig.getPulsarServiceUrl(),
@@ -124,7 +137,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
authConfig,
workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
- workerConfig.getProcessContainerFactory().getLogDirectory());
+ workerConfig.getProcessContainerFactory().getLogDirectory(),
+ secretsProviderConfigurator);
} else if (workerConfig.getKubernetesContainerFactory() != null){
this.runtimeFactory = new KubernetesRuntimeFactory(
workerConfig.getKubernetesContainerFactory().getK8Uri(),
@@ -142,7 +156,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
authConfig,
workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
- workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace());
+ workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
+ secretsProviderConfigurator);
} else {
throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
}
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 40ae567..5a95a00 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -152,6 +152,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
}
private KubernetesContainerFactory kubernetesContainerFactory;
+ // The classname of the secrets provider configurator.
+ private String secretsProviderConfiguratorClassName;
+ // Any config the secret provider configurator might need. This is passed on
+ // to the init method of the secretproviderconfigurator
+ private Map<String, String> secretsProviderConfiguratorConfig;
+
public String getFunctionMetadataTopic() {
return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
}
@@ -199,4 +205,4 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
public void setProperties(Properties properties) {
this.properties = properties;
}
-}
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 0b8d177..945a0cc 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Function.Assignment;
import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
import org.mockito.Mockito;
import org.mockito.invocation.Invocation;
@@ -80,7 +81,7 @@ public class SchedulerManagerTest {
private ScheduledExecutorService executor;
@BeforeMethod
- public void setup() throws PulsarClientException {
+ public void setup() {
WorkerConfig workerConfig = new WorkerConfig();
workerConfig.setWorkerId("worker-1");
workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
@@ -140,7 +141,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -186,7 +187,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -233,7 +234,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -293,7 +294,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function1);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -358,7 +359,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -469,7 +470,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -596,7 +597,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -650,7 +651,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments
@@ -783,7 +784,7 @@ public class SchedulerManagerTest {
functionMetaDataList.add(function2);
doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
- ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+ ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
// set assignments