You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2018/10/31 00:35:48 UTC

[pulsar] branch master updated: Hooked up secrets function api with secret function implementations (#2875)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 649c6ca  Hooked up secrets function api with secret function implementations (#2875)
649c6ca is described below

commit 649c6ca4eb83d340edfd4b431e0e4e68099d95f6
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Tue Oct 30 17:35:44 2018 -0700

    Hooked up secrets function api with secret function implementations (#2875)
    
    * Hooked up secrets function api with secret function implementations
    
    * Fixed unittest
    
    * Added more docs
    
    * Took feedback into account
---
 .../python/pulsar/functions/context.py             |  5 ++
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  3 +-
 .../org/apache/pulsar/functions/api/Context.java   |  7 +++
 pulsar-functions/instance/pom.xml                  |  6 +++
 .../pulsar/functions/instance/ContextImpl.java     | 24 ++++++++-
 .../functions/instance/JavaInstanceRunnable.java   |  9 +++-
 .../instance/src/main/python/contextimpl.py        | 11 +++-
 .../instance/src/main/python/python_instance.py    |  5 +-
 .../src/main/python/python_instance_main.py        | 19 ++++++-
 .../instance/src/main/python/secretsprovider.py    | 61 +++++++++++++++++++++
 .../pulsar/functions/instance/ContextImplTest.java |  4 +-
 .../instance/JavaInstanceRunnableTest.java         |  2 +-
 .../src/test/python/test_python_instance.py        |  2 +-
 ..._python_instance.py => test_secretsprovider.py} | 48 ++++++++---------
 pulsar-functions/runtime/pom.xml                   |  6 +++
 .../pulsar/functions/runtime/JavaInstanceMain.java | 35 +++++++++++-
 .../functions/runtime/KubernetesRuntime.java       | 24 ++++++++-
 .../runtime/KubernetesRuntimeFactory.java          | 19 ++++++-
 .../pulsar/functions/runtime/ProcessRuntime.java   | 15 +++++-
 .../functions/runtime/ProcessRuntimeFactory.java   |  7 ++-
 .../pulsar/functions/runtime/RuntimeUtils.java     | 10 ++++
 .../pulsar/functions/runtime/ThreadRuntime.java    |  7 ++-
 .../functions/runtime/ThreadRuntimeFactory.java    | 14 +++--
 .../functions/runtime/KubernetesRuntimeTest.java   | 15 +++---
 .../functions/runtime/ProcessRuntimeTest.java      | 63 ++++++++++++++++++++--
 .../functions/worker/FunctionRuntimeManager.java   | 21 ++++++--
 .../pulsar/functions/worker/WorkerConfig.java      |  8 ++-
 .../functions/worker/SchedulerManagerTest.java     | 21 ++++----
 28 files changed, 394 insertions(+), 77 deletions(-)

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 47e86f9..6575f7a 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -99,6 +99,11 @@ class Context(object):
     pass
 
   @abstractmethod
+  def get_secret(self, secret_name):
+    """Returns the secret value associated with the name. None if nothing was found"""
+    pass
+
+  @abstractmethod
   def record_metric(self, metric_name, metric_value):
     """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
     pass
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a75092c..3169e22 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -73,6 +73,7 @@ import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
@@ -1042,7 +1043,7 @@ public class CmdFunctions extends CmdBase {
         }
 
         try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
-                null)) {
+                null, new DefaultSecretsProviderConfigurator())) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index c66ea6e..e9b79c3 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -160,6 +160,13 @@ public interface Context {
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
 
     /**
+     * Get the secret associated with this key
+     * @param secretName The name of the secret
+     * @return The secret if anything was found or null
+     */
+    String getSecret(String secretName);
+
+    /**
      * Record a user defined metric
      * @param metricName The name of the metric
      * @param value The value of the metric
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 1c8f3ab..8dec8dc 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -65,6 +65,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-secrets</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
      <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 4d47433..406fe13 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
@@ -101,12 +102,16 @@ class ContextImpl implements Context, SinkContext, SourceContext {
 
     private final TopicSchema topicSchema;
 
+    private final SecretsProvider secretsProvider;
+    private final Map<String, Object> secretsMap;
+
     @Getter
     @Setter
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
-    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics) {
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
+                       SecretsProvider secretsProvider) {
         this.config = config;
         this.logger = logger;
         this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
@@ -125,6 +130,14 @@ class ContextImpl implements Context, SinkContext, SourceContext {
                     new TypeToken<Map<String, Object>>() {
                     }.getType());
         }
+        this.secretsProvider = secretsProvider;
+        if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) {
+            secretsMap = new Gson().fromJson(config.getFunctionDetails().getSecretsMap(),
+                    new TypeToken<Map<String, Object>>() {
+                    }.getType());
+        } else {
+            secretsMap = new HashMap<>();
+        }
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -212,6 +225,15 @@ class ContextImpl implements Context, SinkContext, SourceContext {
         return userConfigs;
     }
 
+    @Override
+    public String getSecret(String secretName) {
+        if (secretsMap.containsKey(secretName)) {
+            return secretsProvider.provideSecret(secretName, secretsMap.get(secretName));
+        } else {
+            return null;
+        }
+    }
+
     private void ensureStateEnabled() {
         checkState(null != stateContext, "State is not enabled.");
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ace5efc..f626c10 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -49,6 +49,7 @@ import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
@@ -110,6 +111,8 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
     private Source source;
     private Sink sink;
 
+    private final SecretsProvider secretsProvider;
+
     public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
     public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__";
     public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__";
@@ -122,13 +125,15 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
                                 FunctionCacheManager fnCache,
                                 String jarFile,
                                 PulsarClient pulsarClient,
-                                String stateStorageServiceUrl) {
+                                String stateStorageServiceUrl,
+                                SecretsProvider secretsProvider) {
         this.instanceConfig = instanceConfig;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
+        this.secretsProvider = secretsProvider;
     }
 
     /**
@@ -173,7 +178,7 @@ public class JavaInstanceRunnable implements AutoCloseable, Runnable {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics);
+        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider);
     }
 
     /**
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 24246ca..83a63aa 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -48,12 +48,13 @@ class AccumulatedMetricDatum(object):
       self.min = value
 
 class ContextImpl(pulsar.Context):
-  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers):
+  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
+    self.secrets_provider = secrets_provider
     self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
@@ -64,6 +65,9 @@ class ContextImpl(pulsar.Context):
     self.user_config = json.loads(instance_config.function_details.userConfig) \
       if instance_config.function_details.userConfig \
       else []
+    self.secrets_map = json.loads(instance_config.function_details.secretsMap) \
+      if instance_config.function_details.secretsMap \
+      else {}
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
@@ -107,6 +111,11 @@ class ContextImpl(pulsar.Context):
   def get_user_config_map(self):
     return self.user_config
 
+  def get_secret(self, secret_key):
+    if not secret_key in self.secrets_map:
+      return None
+    return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key])
+
   def record_metric(self, metric_name, metric_value):
     if not metric_name in self.current_accumulated_metrics:
       self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 4b9ae3a..2438338 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -132,7 +132,7 @@ class Stats(object):
     
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = queue.Queue(max_buffered_tuples)
@@ -157,6 +157,7 @@ class PythonInstance(object):
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
+    self.secrets_provider = secrets_provider
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -226,7 +227,7 @@ class PythonInstance(object):
     except:
       self.function_purefunction = function_kclass
 
-    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers)
+    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
     # Now launch a thread that does execution
     self.exeuction_thread = threading.Thread(target=self.actual_execution)
     self.exeuction_thread.start()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 2acfd6e..748923e 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -29,6 +29,8 @@ import sys
 import signal
 import time
 import zipfile
+import json
+import inspect
 
 import pulsar
 
@@ -72,11 +74,12 @@ def main():
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
+  parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider')
+  parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider')
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
   parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
 
-
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
   args.function_details = str(args.function_details)
@@ -120,11 +123,23 @@ def main():
   if args.tls_trust_cert_path:
      tls_trust_cert_path =  args.tls_trust_cert_path
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
+
+  secrets_provider = None
+  if args.secrets_provider is not None:
+    secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), str(args.secrets_provider))
+  else:
+    secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), "secretsprovider.ClearTextSecretsProvider")
+  secrets_provider = secrets_provider()
+  secrets_provider_config = None
+  if args.secrets_provider_config is not None:
+    secrets_provider_config = json.loads(str(args.secrets_provider_config))
+  secrets_provider.init(secrets_provider_config)
+
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
                                               str(args.function_version), function_details,
                                               int(args.max_buffered_tuples),
                                               int(args.expected_healthcheck_interval),
-                                              str(args.py), pulsar_client)
+                                              str(args.py), pulsar_client, secrets_provider)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git a/pulsar-functions/instance/src/main/python/secretsprovider.py b/pulsar-functions/instance/src/main/python/secretsprovider.py
new file mode 100644
index 0000000..db8e68c
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/secretsprovider.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""secretsprovider.py: Interfaces and definitions for Secret Providers
+"""
+from abc import abstractmethod
+import os
+
+class SecretsProvider:
+  """Interface for providing secrets information runtime"""
+  @abstractmethod
+  def init(self, config):
+    """Do any kind of initialization"""
+    pass
+
+  @abstractmethod
+  def provide_secret(self, secret_name, path_to_secret):
+    """Fetches the secret located at the path"""
+    pass
+
+
+"""A simple implementation that represents storing secrets in clear text """
+class ClearTextSecretsProvider(SecretsProvider):
+  def __init__(self):
+    pass
+
+  def init(self, config):
+    pass
+
+  def provide_secret(self, secret_name, path_to_secret):
+    return path_to_secret
+
+"""Implementation that fetches secrets from environment variables"""
+class EnvironmentBasedSecretsProvider(SecretsProvider):
+  def __init__(self):
+    pass
+
+  def init(self, config):
+    pass
+
+  def provide_secret(self, secret_name, path_to_secret):
+    return os.environ.get(secret_name)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index f5108fc..e3e32fd 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -38,6 +38,7 @@ import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -73,7 +74,8 @@ public class ContextImplTest {
             config,
             logger,
             client,
-            new ArrayList<>()
+            new ArrayList<>(),
+            new EnvironmentBasedSecretsProvider()
         );
     }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 12d4f19..80b3b1d 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ public class JavaInstanceRunnableTest {
     private JavaInstanceRunnable createRunnable(boolean addCustom, String outputSerde) throws Exception {
         InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null);
+                config, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 0b5355f..748e5d8 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ class TestContextImpl(unittest.TestCase):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None)
 
     context_impl.publish("test_topic_name", "test_message")
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
similarity index 50%
copy from pulsar-functions/instance/src/test/python/test_python_instance.py
copy to pulsar-functions/instance/src/test/python/test_secretsprovider.py
index 0b5355f..5d725ee 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
@@ -20,11 +20,9 @@
 
 # DEPENDENCIES:  unittest2,mock
 
-from contextimpl import ContextImpl
-from python_instance import InstanceConfig
-from mock import Mock
+from secretsprovider import ClearTextSecretsProvider
+from secretsprovider import EnvironmentBasedSecretsProvider
 
-import Function_pb2
 import log
 import os
 import unittest
@@ -34,25 +32,23 @@ class TestContextImpl(unittest.TestCase):
   def setUp(self):
     log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini")
 
-  def test_context_publish(self):
-    instance_id = 'test_instance_id'
-    function_id = 'test_function_id'
-    function_version = 'test_function_version'
-    function_details = Function_pb2.FunctionDetails()
-    max_buffered_tuples = 100;
-    instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
-    logger = log.Log
-    pulsar_client = Mock()
-    producer = Mock()
-    producer.send_async = Mock(return_value=None)
-    pulsar_client.create_producer = Mock(return_value=producer)
-    user_code=__file__
-    consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers)
-
-    context_impl.publish("test_topic_name", "test_message")
-
-    producer.send_async.assert_called_with("test_message", None, properties=None)
-
-
-
+  def test_cleartext_secretsprovider(self):
+    provider = ClearTextSecretsProvider()
+    secret = provider.provide_secret("secretName", "secretPath")
+    self.assertEqual(secret, "secretPath")
+    secret = provider.provide_secret("secretName", "")
+    self.assertEqual(secret, "")
+    secret = provider.provide_secret("secretName", None)
+    self.assertEqual(secret, None)
+
+  def test_environment_secretsprovider(self):
+    provider = EnvironmentBasedSecretsProvider()
+    secret = provider.provide_secret("secretName", "secretPath")
+    self.assertEqual(secret, None)
+    os.environ["secretName"] = "secretValue"
+    secret = provider.provide_secret("secretName", "")
+    self.assertEqual(secret, "secretValue")
+    secret = provider.provide_secret("secretName", None)
+    self.assertEqual(secret, "secretValue")
+    secret = provider.provide_secret("secretName", "somethingelse")
+    self.assertEqual(secret, "secretValue")
\ No newline at end of file
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 66d245f..e0b264d 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -46,6 +46,12 @@
     </dependency>
 
     <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-secrets</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
     </dependency>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f..80503bb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,18 +22,26 @@ package org.apache.pulsar.functions.runtime;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.Reflections;
 
+import java.lang.reflect.Type;
+import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -95,6 +103,12 @@ public class JavaInstanceMain implements AutoCloseable {
     @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
     protected int expectedHealthCheckInterval;
 
+    @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
+    protected String secretsProviderClassName;
+
+    @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
+    protected String secretsProviderConfig;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -122,13 +136,32 @@ public class JavaInstanceMain implements AutoCloseable {
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
 
+        Map<String, String> secretsProviderConfigMap = null;
+        if (!StringUtils.isEmpty(secretsProviderConfig)) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            secretsProviderConfigMap = new Gson().fromJson(secretsProviderConfig, type);
+        }
+
+        if (StringUtils.isEmpty(secretsProviderClassName)) {
+            secretsProviderClassName = ClearTextSecretsProvider.class.getName();
+        }
+
+        SecretsProvider secretsProvider;
+        try {
+            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        secretsProvider.init(secretsProviderConfigMap);
+
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
                 stateStorageServiceUrl,
                 AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
                         .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
                         .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
-                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build());
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                secretsProvider);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 7f3f72d..152842e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -22,6 +22,8 @@ package org.apache.pulsar.functions.runtime;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import com.squareup.okhttp.Response;
@@ -33,6 +35,7 @@ import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.models.*;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
@@ -40,7 +43,10 @@ import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
+import java.lang.reflect.Type;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -97,6 +103,7 @@ class KubernetesRuntime implements Runtime {
     private final String userCodePkgUrl;
     private final String originalCodeFileName;
     private final String pulsarAdminUrl;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private boolean running;
 
 
@@ -119,6 +126,7 @@ class KubernetesRuntime implements Runtime {
                       String pulsarAdminUrl,
                       String stateStorageServiceUrl,
                       AuthenticationConfig authConfig,
+                      SecretsProviderConfigurator secretsProviderConfigurator,
                       Integer expectedMetricsInterval) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
@@ -130,7 +138,13 @@ class KubernetesRuntime implements Runtime {
         this.userCodePkgUrl = userCodePkgUrl;
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
+        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+        String secretsProviderConfig = null;
+        if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+            secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 logConfigFile = "kubernetes_instance_log4j2.yml";
@@ -141,7 +155,7 @@ class KubernetesRuntime implements Runtime {
         }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
-                installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
+                secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
         this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
@@ -156,6 +170,10 @@ class KubernetesRuntime implements Runtime {
         try {
             submitStatefulSet();
         } catch (Exception e) {
+            log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(), e);
             deleteService();
         }
         running = true;
@@ -536,8 +554,10 @@ class KubernetesRuntime implements Runtime {
                 .valueFrom(new V1EnvVarSource()
                         .fieldRef(new V1ObjectFieldSelector()
                                 .fieldPath("metadata.name")));
-        container.setEnv(Arrays.asList(envVarPodName));
+        container.addEnvItem(envVarPodName);
 
+        // Configure secrets
+        secretsProviderConfigurator.configureKubernetesRuntimeSecretsProvider(container, instanceConfig.getFunctionDetails());
 
         // set container resources
         final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index c6d5d02..2adbb5e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -20,6 +20,8 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
@@ -30,11 +32,14 @@ import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Type;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -72,6 +77,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String prometheusMetricsServerJarFile;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private final String logDirectory = "logs/functions";
     private Timer changeConfigMapTimer;
     private AppsV1Api appsClient;
@@ -93,7 +99,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                                     AuthenticationConfig authConfig,
                                     Integer expectedMetricsCollectionInterval,
                                     String changeConfigMap,
-                                    String changeConfigMapNamespace) {
+                                    String changeConfigMapNamespace,
+                                    SecretsProviderConfigurator secretsProviderConfigurator) {
         this.kubernetesInfo = new KubernetesInfo();
         this.kubernetesInfo.setK8Uri(k8Uri);
         if (!isEmpty(jobNamespace)) {
@@ -126,6 +133,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
         this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
         this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
     }
 
     @Override
@@ -169,7 +177,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             this.kubernetesInfo.getPulsarAdminUrl(),
             stateStorageServiceUri,
             authConfig,
-                expectedMetricsCollectionInterval);
+            secretsProviderConfigurator,
+            expectedMetricsCollectionInterval);
     }
 
     @Override
@@ -179,6 +188,12 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     @Override
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
         KubernetesRuntime.doChecks(functionDetails);
+        if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+            Type type = new TypeToken<Map<String, Object>>() {
+            }.getType();
+            Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
+            secretsProviderConfigurator.validateSecretMap(secretsMap);
+        }
     }
 
     @VisibleForTesting
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2fa7f82..76db513 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -22,6 +22,7 @@ package org.apache.pulsar.functions.runtime;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -29,9 +30,12 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.io.InputStream;
 import java.util.List;
@@ -60,6 +64,7 @@ class ProcessRuntime implements Runtime {
     private ScheduledExecutorService timer;
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private static final long GRPC_TIMEOUT_SECS = 5;
 
     ProcessRuntime(InstanceConfig instanceConfig,
@@ -69,11 +74,18 @@ class ProcessRuntime implements Runtime {
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
                    AuthenticationConfig authConfig,
+                   SecretsProviderConfigurator secretsProviderConfigurator,
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
+        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+        String secretsProviderConfig = null;
+        if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+            secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 logConfigFile = "java_instance_log4j2.yml";
@@ -84,7 +96,7 @@ class ProcessRuntime implements Runtime {
         }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                logConfigFile, false, null, null);
+                logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null);
     }
 
     /**
@@ -258,6 +270,7 @@ class ProcessRuntime implements Runtime {
         deathException = null;
         try {
             ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
+            secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails());
             log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
             process = processBuilder.start();
         } catch (Exception ex) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 78b069c..41a28fb 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -24,6 +24,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.nio.file.Paths;
@@ -37,6 +38,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
     private final String pulsarServiceUrl;
     private final String stateStorageServiceUrl;
     private AuthenticationConfig authConfig;
+    private SecretsProviderConfigurator secretsProviderConfigurator;
     private String javaInstanceJarFile;
     private String pythonInstanceFile;
     private String logDirectory;
@@ -47,10 +49,12 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
                                  AuthenticationConfig authConfig,
                                  String javaInstanceJarFile,
                                  String pythonInstanceFile,
-                                 String logDirectory) {
+                                 String logDirectory,
+                                 SecretsProviderConfigurator secretsProviderConfigurator) {
         this.pulsarServiceUrl = pulsarServiceUrl;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.authConfig = authConfig;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.javaInstanceJarFile = javaInstanceJarFile;
         this.pythonInstanceFile = pythonInstanceFile;
         this.logDirectory = logDirectory;
@@ -113,6 +117,7 @@ public class ProcessRuntimeFactory implements RuntimeFactory {
             pulsarServiceUrl,
             stateStorageServiceUrl,
             authConfig,
+            secretsProviderConfigurator,
             expectedHealthCheckInterval);
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 2647536..f12222e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -21,6 +21,7 @@ package org.apache.pulsar.functions.runtime;
 
 import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
@@ -49,6 +50,8 @@ class RuntimeUtils {
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
                                            String logConfigFile,
+                                           String secretsProviderClassName,
+                                           String secretsProviderConfig,
                                            Boolean installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
                                            String pythonExtraDependencyRepository) throws Exception {
@@ -150,6 +153,13 @@ class RuntimeUtils {
         }
         args.add("--expected_healthcheck_interval");
         args.add(String.valueOf(expectedHealthCheckInterval));
+
+        args.add("--secrets_provider");
+        args.add(secretsProviderClassName);
+        if (!StringUtils.isEmpty(secretsProviderConfig)) {
+            args.add("--secrets_provider_config");
+            args.add(secretsProviderConfig);
+        }
         return args;
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 9dafbe9..5e42c52 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -28,6 +28,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
@@ -51,7 +52,8 @@ class ThreadRuntime implements Runtime {
                   ThreadGroup threadGroup,
                   String jarFile,
                   PulsarClient pulsarClient,
-                  String stateStorageServiceUrl) {
+                  String stateStorageServiceUrl,
+                  SecretsProvider secretsProvider) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java Runtime");
@@ -61,7 +63,8 @@ class ThreadRuntime implements Runtime {
             fnCache,
             jarFile,
             pulsarClient,
-            stateStorageServiceUrl);
+            stateStorageServiceUrl,
+            secretsProvider);
         this.threadGroup = threadGroup;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index dfbbb64..846028d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -28,9 +28,9 @@ import static org.apache.commons.lang3.StringUtils.isNotBlank;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 
@@ -44,15 +44,18 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private final FunctionCacheManager fnCache;
     private final PulsarClient pulsarClient;
     private final String storageServiceUrl;
+    private final SecretsProvider secretsProvider;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
-            AuthenticationConfig authConfig) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl);
+                                AuthenticationConfig authConfig, SecretsProvider secretsProvider) throws Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider);
     }
 
     @VisibleForTesting
-    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
+    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
+                                SecretsProvider secretsProvider) {
+        this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
@@ -90,7 +93,8 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
             threadGroup,
             jarFile,
             pulsarClient,
-            storageServiceUrl);
+            storageServiceUrl,
+            secretsProvider);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 0288660..5c05de2 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -24,6 +24,7 @@ import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -73,8 +74,8 @@ public class KubernetesRuntimeTest {
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
             false, true, "myrepo", "anotherrepo",
-                null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null,
-                null, null, null));
+                null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null,
+                null, null, new DefaultSecretsProviderConfigurator()));
         doNothing().when(this.factory).setupClient();
     }
 
@@ -123,7 +124,7 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 30);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
@@ -137,7 +138,8 @@ public class KubernetesRuntimeTest {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1"
+                + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -147,7 +149,7 @@ public class KubernetesRuntimeTest {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 32);
+        assertEquals(args.size(), 34);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
@@ -162,7 +164,8 @@ public class KubernetesRuntimeTest {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(29)
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1"
+                + " --secrets_provider secretsprovider.ClearTextSecretsProvider";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index fe492b8..c2f3638 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,16 +21,22 @@ package org.apache.pulsar.functions.runtime;
 
 import static org.testng.Assert.assertEquals;
 
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.util.JsonFormat;
 
+import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import io.kubernetes.client.models.V1Container;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -40,6 +46,48 @@ import org.testng.annotations.Test;
  */
 public class ProcessRuntimeTest {
 
+    class TestSecretsProviderConfigurator implements SecretsProviderConfigurator {
+
+        @Override
+        public void init(Map<String, String> config) {
+
+        }
+
+        @Override
+        public String getSecretsProviderClassName(FunctionDetails functionDetails) {
+            if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) {
+                return ClearTextSecretsProvider.class.getName();
+            } else {
+                return "secretsprovider.ClearTextSecretsProvider";
+            }
+        }
+
+        @Override
+        public Map<String, String> getSecretsProviderConfig(FunctionDetails functionDetails) {
+            Map<String, String> config = new HashMap<>();
+            config.put("Config", "Value");
+            return config;
+        }
+
+        @Override
+        public void configureKubernetesRuntimeSecretsProvider(V1Container container, FunctionDetails functionDetails) {
+        }
+
+        @Override
+        public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, FunctionDetails functionDetails) {
+        }
+
+        @Override
+        public Type getSecretObjectType() {
+            return TypeToken.get(String.class).getType();
+        }
+
+        @Override
+        public void validateSecretMap(Map<String, Object> secretMap) {
+
+        }
+    }
+
     private static final String TEST_TENANT = "test-function-tenant";
     private static final String TEST_NAMESPACE = "test-function-namespace";
     private static final String TEST_NAME = "test-function-container";
@@ -67,7 +115,8 @@ public class ProcessRuntimeTest {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "Users/user/logs";
         this.factory = new ProcessRuntimeFactory(
-            pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory);
+            pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory,
+                new TestSecretsProviderConfigurator());
     }
 
     @AfterMethod
@@ -115,7 +164,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 32);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +178,9 @@ public class ProcessRuntimeTest {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval 30"
+                + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+                + " --secrets_provider_config {\"Config\":\"Value\"}";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -139,7 +190,7 @@ public class ProcessRuntimeTest {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 26);
+        assertEquals(args.size(), 30);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
@@ -149,7 +200,9 @@ public class ProcessRuntimeTest {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval 30"
+                + " --secrets_provider secretsprovider.ClearTextSecretsProvider"
+                + " --secrets_provider_config {\"Config\":\"Value\"}";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index d8bf252..100c7c8 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -54,6 +54,10 @@ import com.google.common.annotations.VisibleForTesting;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
 
 /**
  * This class managers all aspects of functions assignments and running of function assignments for this worker
@@ -104,6 +108,14 @@ public class FunctionRuntimeManager implements AutoCloseable{
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
+        SecretsProviderConfigurator secretsProviderConfigurator;
+        if (!StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName())) {
+            secretsProviderConfigurator = (SecretsProviderConfigurator) Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader());
+        } else {
+            secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
+        }
+        secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
+
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
                 .clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
@@ -116,7 +128,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     workerConfig.getThreadContainerFactory().getThreadGroupName(),
                     workerConfig.getPulsarServiceUrl(),
                     workerConfig.getStateStorageServiceUrl(),
-                    authConfig);
+                    authConfig,
+                    new ClearTextSecretsProvider());
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
@@ -124,7 +137,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     authConfig,
                     workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
                     workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
-                    workerConfig.getProcessContainerFactory().getLogDirectory());
+                    workerConfig.getProcessContainerFactory().getLogDirectory(),
+                    secretsProviderConfigurator);
         } else if (workerConfig.getKubernetesContainerFactory() != null){
             this.runtimeFactory = new KubernetesRuntimeFactory(
                     workerConfig.getKubernetesContainerFactory().getK8Uri(),
@@ -142,7 +156,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
                     authConfig,
                     workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
                     workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
-                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace());
+                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
+                    secretsProviderConfigurator);
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 40ae567..5a95a00 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -152,6 +152,12 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     }
     private KubernetesContainerFactory kubernetesContainerFactory;
 
+    // The classname of the secrets provider configurator.
+    private String secretsProviderConfiguratorClassName;
+    // Any config the secret provider configurator might need. This is passed on
+    // to the init method of the secretproviderconfigurator
+    private Map<String, String> secretsProviderConfiguratorConfig;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
     }
@@ -199,4 +205,4 @@ public class WorkerConfig implements Serializable, PulsarConfiguration {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 0b8d177..945a0cc 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -32,6 +32,7 @@ import org.apache.pulsar.client.api.TypedMessageBuilder;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -80,7 +81,7 @@ public class SchedulerManagerTest {
     private ScheduledExecutorService executor;
 
     @BeforeMethod
-    public void setup() throws PulsarClientException {
+    public void setup() {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
@@ -140,7 +141,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -186,7 +187,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -233,7 +234,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -293,7 +294,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -358,7 +359,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -469,7 +470,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -596,7 +597,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -650,7 +651,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -783,7 +784,7 @@ public class SchedulerManagerTest {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments