You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/10/31 00:35:45 UTC

[GitHub] srkukarni closed pull request #2875: Hooked up secrets function api with secret function implementations

srkukarni closed pull request #2875: Hooked up secrets function api with secret function implementations
URL: https://github.com/apache/pulsar/pull/2875
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/python/pulsar/functions/context.py b/pulsar-client-cpp/python/pulsar/functions/context.py
index 47e86f978f..6575f7abf8 100644
--- a/pulsar-client-cpp/python/pulsar/functions/context.py
+++ b/pulsar-client-cpp/python/pulsar/functions/context.py
@@ -98,6 +98,11 @@ def get_user_config_map(self):
     """Returns the entire user-defined config as a dict (the dict will be empty if no user-defined config is supplied)"""
     pass
 
+  @abstractmethod
+  def get_secret(self, secret_name):
+    """Returns the secret value associated with the name. None if nothing was found"""
+    pass
+
   @abstractmethod
   def record_metric(self, metric_name, metric_value):
     """Records the metric_value. metric_value has to satisfy isinstance(metric_value, numbers.Number)"""
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index a75092c796..3169e2224b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -73,6 +73,7 @@
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.runtime.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.*;
 import org.apache.pulsar.functions.windowing.WindowUtils;
 
@@ -1042,7 +1043,7 @@ protected static void startLocalRun(org.apache.pulsar.functions.proto.Function.F
         }
 
         try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(serviceUrl, stateStorageServiceUrl, authConfig, null, null,
-                null)) {
+                null, new DefaultSecretsProviderConfigurator())) {
             List<RuntimeSpawner> spawners = new LinkedList<>();
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index c66ea6efa7..e9b79c3699 100644
--- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -159,6 +159,13 @@
      */
     Object getUserConfigValueOrDefault(String key, Object defaultValue);
 
+    /**
+     * Get the secret associated with this key
+     * @param secretName The name of the secret
+     * @return The secret if anything was found or null
+     */
+    String getSecret(String secretName);
+
     /**
      * Record a user defined metric
      * @param metricName The name of the metric
diff --git a/pulsar-functions/instance/pom.xml b/pulsar-functions/instance/pom.xml
index 1c8f3ab78e..8dec8dcdb1 100644
--- a/pulsar-functions/instance/pom.xml
+++ b/pulsar-functions/instance/pom.xml
@@ -65,6 +65,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-secrets</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
      <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-client-original</artifactId>
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index 4d474333b9..406fe13db6 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -49,6 +49,7 @@
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.SinkSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.source.TopicSchema;
 import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.core.SourceContext;
@@ -101,12 +102,16 @@ public void update(double value) {
 
     private final TopicSchema topicSchema;
 
+    private final SecretsProvider secretsProvider;
+    private final Map<String, Object> secretsMap;
+
     @Getter
     @Setter
     private StateContextImpl stateContext;
     private Map<String, Object> userConfigs;
 
-    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics) {
+    public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, List<String> inputTopics,
+                       SecretsProvider secretsProvider) {
         this.config = config;
         this.logger = logger;
         this.currentAccumulatedMetrics = new ConcurrentHashMap<>();
@@ -125,6 +130,14 @@ public ContextImpl(InstanceConfig config, Logger logger, PulsarClient client, Li
                     new TypeToken<Map<String, Object>>() {
                     }.getType());
         }
+        this.secretsProvider = secretsProvider;
+        if (!StringUtils.isEmpty(config.getFunctionDetails().getSecretsMap())) {
+            secretsMap = new Gson().fromJson(config.getFunctionDetails().getSecretsMap(),
+                    new TypeToken<Map<String, Object>>() {
+                    }.getType());
+        } else {
+            secretsMap = new HashMap<>();
+        }
     }
 
     public void setCurrentMessageContext(Record<?> record) {
@@ -212,6 +225,15 @@ public Object getUserConfigValueOrDefault(String key, Object defaultValue) {
         return userConfigs;
     }
 
+    @Override
+    public String getSecret(String secretName) {
+        if (secretsMap.containsKey(secretName)) {
+            return secretsProvider.provideSecret(secretName, secretsMap.get(secretName));
+        } else {
+            return null;
+        }
+    }
+
     private void ensureStateEnabled() {
         checkState(null != stateContext, "State is not enabled.");
     }
diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
index ace5efc57b..f626c1066d 100644
--- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
+++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/JavaInstanceRunnable.java
@@ -49,6 +49,7 @@
 import org.apache.pulsar.functions.proto.Function.SourceSpec;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.MetricsData.Builder;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.sink.PulsarSink;
 import org.apache.pulsar.functions.sink.PulsarSinkConfig;
 import org.apache.pulsar.functions.sink.PulsarSinkDisable;
@@ -110,6 +111,8 @@
     private Source source;
     private Sink sink;
 
+    private final SecretsProvider secretsProvider;
+
     public static final String METRICS_TOTAL_PROCESSED = "__total_processed__";
     public static final String METRICS_TOTAL_SUCCESS = "__total_successfully_processed__";
     public static final String METRICS_TOTAL_SYS_EXCEPTION = "__total_system_exceptions__";
@@ -122,13 +125,15 @@ public JavaInstanceRunnable(InstanceConfig instanceConfig,
                                 FunctionCacheManager fnCache,
                                 String jarFile,
                                 PulsarClient pulsarClient,
-                                String stateStorageServiceUrl) {
+                                String stateStorageServiceUrl,
+                                SecretsProvider secretsProvider) {
         this.instanceConfig = instanceConfig;
         this.fnCache = fnCache;
         this.jarFile = jarFile;
         this.client = (PulsarClientImpl) pulsarClient;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.stats = new FunctionStats();
+        this.secretsProvider = secretsProvider;
     }
 
     /**
@@ -173,7 +178,7 @@ ContextImpl setupContext() {
         }
         Logger instanceLog = LoggerFactory.getLogger(
                 "function-" + instanceConfig.getFunctionDetails().getName());
-        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics);
+        return new ContextImpl(instanceConfig, instanceLog, client, inputTopics, secretsProvider);
     }
 
     /**
diff --git a/pulsar-functions/instance/src/main/python/contextimpl.py b/pulsar-functions/instance/src/main/python/contextimpl.py
index 24246caed7..83a63aa98a 100644
--- a/pulsar-functions/instance/src/main/python/contextimpl.py
+++ b/pulsar-functions/instance/src/main/python/contextimpl.py
@@ -48,12 +48,13 @@ def record(self, value):
       self.min = value
 
 class ContextImpl(pulsar.Context):
-  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers):
+  def __init__(self, instance_config, logger, pulsar_client, user_code, consumers, secrets_provider):
     self.instance_config = instance_config
     self.log = logger
     self.pulsar_client = pulsar_client
     self.user_code_dir = os.path.dirname(user_code)
     self.consumers = consumers
+    self.secrets_provider = secrets_provider
     self.current_accumulated_metrics = {}
     self.accumulated_metrics = {}
     self.publish_producers = {}
@@ -64,6 +65,9 @@ def __init__(self, instance_config, logger, pulsar_client, user_code, consumers)
     self.user_config = json.loads(instance_config.function_details.userConfig) \
       if instance_config.function_details.userConfig \
       else []
+    self.secrets_map = json.loads(instance_config.function_details.secretsMap) \
+      if instance_config.function_details.secretsMap \
+      else {}
 
   # Called on a per message basis to set the context for the current message
   def set_current_message_context(self, msgid, topic):
@@ -107,6 +111,11 @@ def get_user_config_value(self, key):
   def get_user_config_map(self):
     return self.user_config
 
+  def get_secret(self, secret_key):
+    if not secret_key in self.secrets_map:
+      return None
+    return self.secrets_provider.provide_secret(secret_key, self.secrets_map[secret_key])
+
   def record_metric(self, metric_name, metric_value):
     if not metric_name in self.current_accumulated_metrics:
       self.current_accumulated_metrics[metric_name] = AccumulatedMetricDatum()
diff --git a/pulsar-functions/instance/src/main/python/python_instance.py b/pulsar-functions/instance/src/main/python/python_instance.py
index 4b9ae3a84a..24383384f2 100644
--- a/pulsar-functions/instance/src/main/python/python_instance.py
+++ b/pulsar-functions/instance/src/main/python/python_instance.py
@@ -132,7 +132,7 @@ def update(self, object):
     
 
 class PythonInstance(object):
-  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client):
+  def __init__(self, instance_id, function_id, function_version, function_details, max_buffered_tuples, expected_healthcheck_interval, user_code, pulsar_client, secrets_provider):
     self.instance_config = InstanceConfig(instance_id, function_id, function_version, function_details, max_buffered_tuples)
     self.user_code = user_code
     self.queue = queue.Queue(max_buffered_tuples)
@@ -157,6 +157,7 @@ def __init__(self, instance_id, function_id, function_version, function_details,
     self.last_health_check_ts = time.time()
     self.timeout_ms = function_details.source.timeoutMs if function_details.source.timeoutMs > 0 else None
     self.expected_healthcheck_interval = expected_healthcheck_interval
+    self.secrets_provider = secrets_provider
 
   def health_check(self):
     self.last_health_check_ts = time.time()
@@ -226,7 +227,7 @@ def run(self):
     except:
       self.function_purefunction = function_kclass
 
-    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers)
+    self.contextimpl = contextimpl.ContextImpl(self.instance_config, Log, self.pulsar_client, self.user_code, self.consumers, self.secrets_provider)
     # Now launch a thread that does execution
     self.exeuction_thread = threading.Thread(target=self.actual_execution)
     self.exeuction_thread.start()
diff --git a/pulsar-functions/instance/src/main/python/python_instance_main.py b/pulsar-functions/instance/src/main/python/python_instance_main.py
index 2acfd6e662..748923eac6 100644
--- a/pulsar-functions/instance/src/main/python/python_instance_main.py
+++ b/pulsar-functions/instance/src/main/python/python_instance_main.py
@@ -29,6 +29,8 @@
 import signal
 import time
 import zipfile
+import json
+import inspect
 
 import pulsar
 
@@ -72,11 +74,12 @@ def main():
   parser.add_argument('--logging_file', required=True, help='Log file name')
   parser.add_argument('--logging_config_file', required=True, help='Config file for logging')
   parser.add_argument('--expected_healthcheck_interval', required=True, help='Expected time in seconds between health checks', type=int)
+  parser.add_argument('--secrets_provider', required=False, help='The classname of the secrets provider')
+  parser.add_argument('--secrets_provider_config', required=False, help='The config that needs to be passed to secrets provider')
   parser.add_argument('--install_usercode_dependencies', required=False, help='For packaged python like wheel files, do we need to install all dependencies', type=bool)
   parser.add_argument('--dependency_repository', required=False, help='For packaged python like wheel files, which repository to pull the dependencies from')
   parser.add_argument('--extra_dependency_repository', required=False, help='For packaged python like wheel files, any extra repository to pull the dependencies from')
 
-
   args = parser.parse_args()
   function_details = Function_pb2.FunctionDetails()
   args.function_details = str(args.function_details)
@@ -120,11 +123,23 @@ def main():
   if args.tls_trust_cert_path:
      tls_trust_cert_path =  args.tls_trust_cert_path
   pulsar_client = pulsar.Client(args.pulsar_serviceurl, authentication, 30, 1, 1, 50000, None, use_tls, tls_trust_cert_path, tls_allow_insecure_connection)
+
+  secrets_provider = None
+  if args.secrets_provider is not None:
+    secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), str(args.secrets_provider))
+  else:
+    secrets_provider = util.import_class(os.path.dirname(inspect.getfile(inspect.currentframe())), "secretsprovider.ClearTextSecretsProvider")
+  secrets_provider = secrets_provider()
+  secrets_provider_config = None
+  if args.secrets_provider_config is not None:
+    secrets_provider_config = json.loads(str(args.secrets_provider_config))
+  secrets_provider.init(secrets_provider_config)
+
   pyinstance = python_instance.PythonInstance(str(args.instance_id), str(args.function_id),
                                               str(args.function_version), function_details,
                                               int(args.max_buffered_tuples),
                                               int(args.expected_healthcheck_interval),
-                                              str(args.py), pulsar_client)
+                                              str(args.py), pulsar_client, secrets_provider)
   pyinstance.run()
   server_instance = server.serve(args.port, pyinstance)
 
diff --git a/pulsar-functions/instance/src/main/python/secretsprovider.py b/pulsar-functions/instance/src/main/python/secretsprovider.py
new file mode 100644
index 0000000000..db8e68c094
--- /dev/null
+++ b/pulsar-functions/instance/src/main/python/secretsprovider.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# -*- encoding: utf-8 -*-
+
+"""secretsprovider.py: Interfaces and definitions for Secret Providers
+"""
+from abc import abstractmethod
+import os
+
+class SecretsProvider:
+  """Interface for providing secrets information runtime"""
+  @abstractmethod
+  def init(self, config):
+    """Do any kind of initialization"""
+    pass
+
+  @abstractmethod
+  def provide_secret(self, secret_name, path_to_secret):
+    """Fetches the secret located at the path"""
+    pass
+
+
+"""A simple implementation that represents storing secrets in clear text """
+class ClearTextSecretsProvider(SecretsProvider):
+  def __init__(self):
+    pass
+
+  def init(self, config):
+    pass
+
+  def provide_secret(self, secret_name, path_to_secret):
+    return path_to_secret
+
+"""Implementation that fetches secrets from environment variables"""
+class EnvironmentBasedSecretsProvider(SecretsProvider):
+  def __init__(self):
+    pass
+
+  def init(self, config):
+    pass
+
+  def provide_secret(self, secret_name, path_to_secret):
+    return os.environ.get(secret_name)
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index f5108fc6c1..e3e32fda3e 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -38,6 +38,7 @@
 import org.apache.pulsar.client.impl.conf.ProducerConfigurationData;
 import org.apache.pulsar.functions.instance.state.StateContextImpl;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Matchers;
@@ -73,7 +74,8 @@ public void setup() {
             config,
             logger,
             client,
-            new ArrayList<>()
+            new ArrayList<>(),
+            new EnvironmentBasedSecretsProvider()
         );
     }
 
diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
index 12d4f19a04..80b3b1da5a 100644
--- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
+++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceRunnableTest.java
@@ -56,7 +56,7 @@ private static InstanceConfig createInstanceConfig(boolean addCustom, String out
     private JavaInstanceRunnable createRunnable(boolean addCustom, String outputSerde) throws Exception {
         InstanceConfig config = createInstanceConfig(addCustom, outputSerde);
         JavaInstanceRunnable javaInstanceRunnable = new JavaInstanceRunnable(
-                config, null, null, null, null);
+                config, null, null, null, null, null);
         return javaInstanceRunnable;
     }
 
diff --git a/pulsar-functions/instance/src/test/python/test_python_instance.py b/pulsar-functions/instance/src/test/python/test_python_instance.py
index 0b5355f81f..748e5d80a8 100644
--- a/pulsar-functions/instance/src/test/python/test_python_instance.py
+++ b/pulsar-functions/instance/src/test/python/test_python_instance.py
@@ -48,7 +48,7 @@ def test_context_publish(self):
     pulsar_client.create_producer = Mock(return_value=producer)
     user_code=__file__
     consumers = None
-    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers)
+    context_impl = ContextImpl(instance_config, logger, pulsar_client, user_code, consumers, None)
 
     context_impl.publish("test_topic_name", "test_message")
 
diff --git a/pulsar-functions/instance/src/test/python/test_secretsprovider.py b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
new file mode 100644
index 0000000000..5d725ee8c0
--- /dev/null
+++ b/pulsar-functions/instance/src/test/python/test_secretsprovider.py
@@ -0,0 +1,54 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+
+# DEPENDENCIES:  unittest2,mock
+
+from secretsprovider import ClearTextSecretsProvider
+from secretsprovider import EnvironmentBasedSecretsProvider
+
+import log
+import os
+import unittest
+
+class TestContextImpl(unittest.TestCase):
+
+  def setUp(self):
+    log.init_logger("INFO", "foo", os.environ.get("PULSAR_HOME") + "/conf/functions-logging/console_logging_config.ini")
+
+  def test_cleartext_secretsprovider(self):
+    provider = ClearTextSecretsProvider()
+    secret = provider.provide_secret("secretName", "secretPath")
+    self.assertEqual(secret, "secretPath")
+    secret = provider.provide_secret("secretName", "")
+    self.assertEqual(secret, "")
+    secret = provider.provide_secret("secretName", None)
+    self.assertEqual(secret, None)
+
+  def test_environment_secretsprovider(self):
+    provider = EnvironmentBasedSecretsProvider()
+    secret = provider.provide_secret("secretName", "secretPath")
+    self.assertEqual(secret, None)
+    os.environ["secretName"] = "secretValue"
+    secret = provider.provide_secret("secretName", "")
+    self.assertEqual(secret, "secretValue")
+    secret = provider.provide_secret("secretName", None)
+    self.assertEqual(secret, "secretValue")
+    secret = provider.provide_secret("secretName", "somethingelse")
+    self.assertEqual(secret, "secretValue")
\ No newline at end of file
diff --git a/pulsar-functions/runtime/pom.xml b/pulsar-functions/runtime/pom.xml
index 66d245f8d6..e0b264d78c 100644
--- a/pulsar-functions/runtime/pom.xml
+++ b/pulsar-functions/runtime/pom.xml
@@ -45,6 +45,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>org.apache.pulsar</groupId>
+      <artifactId>pulsar-functions-secrets</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>io.grpc</groupId>
       <artifactId>grpc-all</artifactId>
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
index 1551f6f345..80503bb82b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/JavaInstanceMain.java
@@ -22,18 +22,26 @@
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.converters.StringConverter;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import io.grpc.Server;
 import io.grpc.ServerBuilder;
 import io.grpc.stub.StreamObserver;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
+import org.apache.pulsar.functions.utils.Reflections;
 
+import java.lang.reflect.Type;
+import java.util.Map;
 import java.util.TimerTask;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
@@ -95,6 +103,12 @@
     @Parameter(names = "--expected_healthcheck_interval", description = "Expected interval in seconds between healtchecks", required = true)
     protected int expectedHealthCheckInterval;
 
+    @Parameter(names = "--secrets_provider", description = "The classname of the secrets provider", required = false)
+    protected String secretsProviderClassName;
+
+    @Parameter(names = "--secrets_provider_config", description = "The config that needs to be passed to secrets provider", required = false)
+    protected String secretsProviderConfig;
+
     private Server server;
     private RuntimeSpawner runtimeSpawner;
     private ThreadRuntimeFactory containerFactory;
@@ -122,13 +136,32 @@ public void start() throws Exception {
         instanceConfig.setFunctionDetails(functionDetails);
         instanceConfig.setPort(port);
 
+        Map<String, String> secretsProviderConfigMap = null;
+        if (!StringUtils.isEmpty(secretsProviderConfig)) {
+            Type type = new TypeToken<Map<String, String>>() {}.getType();
+            secretsProviderConfigMap = new Gson().fromJson(secretsProviderConfig, type);
+        }
+
+        if (StringUtils.isEmpty(secretsProviderClassName)) {
+            secretsProviderClassName = ClearTextSecretsProvider.class.getName();
+        }
+
+        SecretsProvider secretsProvider;
+        try {
+            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+        secretsProvider.init(secretsProviderConfigMap);
+
         containerFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup", pulsarServiceUrl,
                 stateStorageServiceUrl,
                 AuthenticationConfig.builder().clientAuthenticationPlugin(clientAuthenticationPlugin)
                         .clientAuthenticationParameters(clientAuthenticationParameters).useTls(isTrue(useTls))
                         .tlsAllowInsecureConnection(isTrue(tlsAllowInsecureConnection))
                         .tlsHostnameVerificationEnable(isTrue(tlsHostNameVerificationEnabled))
-                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build());
+                        .tlsTrustCertsFilePath(tlsTrustCertFilePath).build(),
+                secretsProvider);
         runtimeSpawner = new RuntimeSpawner(
                 instanceConfig,
                 jarFile,
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 7f3f72d44b..152842e1df 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -22,6 +22,8 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.Empty;
 import com.google.protobuf.util.JsonFormat;
 import com.squareup.okhttp.Response;
@@ -33,6 +35,7 @@
 import io.kubernetes.client.models.*;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.metrics.PrometheusMetricsServer;
@@ -40,7 +43,10 @@
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
+import java.lang.reflect.Type;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -97,6 +103,7 @@
     private final String userCodePkgUrl;
     private final String originalCodeFileName;
     private final String pulsarAdminUrl;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private boolean running;
 
 
@@ -119,6 +126,7 @@
                       String pulsarAdminUrl,
                       String stateStorageServiceUrl,
                       AuthenticationConfig authConfig,
+                      SecretsProviderConfigurator secretsProviderConfigurator,
                       Integer expectedMetricsInterval) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
@@ -130,7 +138,13 @@
         this.userCodePkgUrl = userCodePkgUrl;
         this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
+        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+        String secretsProviderConfig = null;
+        if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+            secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 logConfigFile = "kubernetes_instance_log4j2.yml";
@@ -141,7 +155,7 @@
         }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, this.originalCodeFileName, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, "$" + ENV_SHARD_ID, GRPC_PORT, -1l, logConfigFile,
-                installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
+                secretsProviderClassName, secretsProviderConfig, installUserCodeDependencies, pythonDependencyRepository, pythonExtraDependencyRepository);
         this.prometheusMetricsServerArgs = composePrometheusMetricsServerArgs(prometheusMetricsServerJarFile, expectedMetricsInterval);
         running = false;
         doChecks(instanceConfig.getFunctionDetails());
@@ -156,6 +170,10 @@ public void start() throws Exception {
         try {
             submitStatefulSet();
         } catch (Exception e) {
+            log.error("Could not submit statefulset for {}/{}/{}, deleting service as well",
+                    instanceConfig.getFunctionDetails().getTenant(),
+                    instanceConfig.getFunctionDetails().getNamespace(),
+                    instanceConfig.getFunctionDetails().getName(), e);
             deleteService();
         }
         running = true;
@@ -536,8 +554,10 @@ private V1Container getFunctionContainer(List<String> instanceCommand, Function.
                 .valueFrom(new V1EnvVarSource()
                         .fieldRef(new V1ObjectFieldSelector()
                                 .fieldPath("metadata.name")));
-        container.setEnv(Arrays.asList(envVarPodName));
+        container.addEnvItem(envVarPodName);
 
+        // Configure secrets
+        secretsProviderConfigurator.configureKubernetesRuntimeSecretsProvider(container, instanceConfig.getFunctionDetails());
 
         // set container resources
         final V1ResourceRequirements resourceRequirements = new V1ResourceRequirements();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
index c6d5d02645..2adbb5e095 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeFactory.java
@@ -20,6 +20,8 @@
 package org.apache.pulsar.functions.runtime;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.gson.Gson;
+import com.google.gson.reflect.TypeToken;
 import io.kubernetes.client.ApiClient;
 import io.kubernetes.client.Configuration;
 import io.kubernetes.client.apis.AppsV1Api;
@@ -30,11 +32,14 @@
 import lombok.NoArgsConstructor;
 import lombok.Setter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Type;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -72,6 +77,7 @@
     private final String javaInstanceJarFile;
     private final String pythonInstanceFile;
     private final String prometheusMetricsServerJarFile;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private final String logDirectory = "logs/functions";
     private Timer changeConfigMapTimer;
     private AppsV1Api appsClient;
@@ -93,7 +99,8 @@ public KubernetesRuntimeFactory(String k8Uri,
                                     AuthenticationConfig authConfig,
                                     Integer expectedMetricsCollectionInterval,
                                     String changeConfigMap,
-                                    String changeConfigMapNamespace) {
+                                    String changeConfigMapNamespace,
+                                    SecretsProviderConfigurator secretsProviderConfigurator) {
         this.kubernetesInfo = new KubernetesInfo();
         this.kubernetesInfo.setK8Uri(k8Uri);
         if (!isEmpty(jobNamespace)) {
@@ -126,6 +133,7 @@ public KubernetesRuntimeFactory(String k8Uri,
         this.pythonInstanceFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/python-instance/python_instance_main.py";
         this.prometheusMetricsServerJarFile = this.kubernetesInfo.getPulsarRootDir() + "/instances/PrometheusMetricsServer.jar";
         this.expectedMetricsCollectionInterval = expectedMetricsCollectionInterval == null ? -1 : expectedMetricsCollectionInterval;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
     }
 
     @Override
@@ -169,7 +177,8 @@ public KubernetesRuntime createContainer(InstanceConfig instanceConfig, String c
             this.kubernetesInfo.getPulsarAdminUrl(),
             stateStorageServiceUri,
             authConfig,
-                expectedMetricsCollectionInterval);
+            secretsProviderConfigurator,
+            expectedMetricsCollectionInterval);
     }
 
     @Override
@@ -179,6 +188,12 @@ public void close() {
     @Override
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
         KubernetesRuntime.doChecks(functionDetails);
+        if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
+            Type type = new TypeToken<Map<String, Object>>() {
+            }.getType();
+            Map<String, Object> secretsMap = new Gson().fromJson(functionDetails.getSecretsMap(), type);
+            secretsProviderConfigurator.validateSecretMap(secretsMap);
+        }
     }
 
     @VisibleForTesting
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
index 2fa7f82238..76db513715 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntime.java
@@ -22,6 +22,7 @@
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
+import com.google.gson.Gson;
 import com.google.protobuf.Empty;
 import io.grpc.ManagedChannel;
 import io.grpc.ManagedChannelBuilder;
@@ -29,9 +30,12 @@
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
 import org.apache.pulsar.functions.proto.InstanceControlGrpc;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 
 import java.io.InputStream;
 import java.util.List;
@@ -60,6 +64,7 @@
     private ScheduledExecutorService timer;
     private InstanceConfig instanceConfig;
     private final Long expectedHealthCheckInterval;
+    private final SecretsProviderConfigurator secretsProviderConfigurator;
     private static final long GRPC_TIMEOUT_SECS = 5;
 
     ProcessRuntime(InstanceConfig instanceConfig,
@@ -69,11 +74,18 @@
                    String pulsarServiceUrl,
                    String stateStorageServiceUrl,
                    AuthenticationConfig authConfig,
+                   SecretsProviderConfigurator secretsProviderConfigurator,
                    Long expectedHealthCheckInterval) throws Exception {
         this.instanceConfig = instanceConfig;
         this.instancePort = instanceConfig.getPort();
         this.expectedHealthCheckInterval = expectedHealthCheckInterval;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         String logConfigFile = null;
+        String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+        String secretsProviderConfig = null;
+        if (secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()) != null) {
+            secretsProviderConfig = new Gson().toJson(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
         switch (instanceConfig.getFunctionDetails().getRuntime()) {
             case JAVA:
                 logConfigFile = "java_instance_log4j2.yml";
@@ -84,7 +96,7 @@
         }
         this.processArgs = RuntimeUtils.composeArgs(instanceConfig, instanceFile, logDirectory, codeFile, pulsarServiceUrl, stateStorageServiceUrl,
                 authConfig, instanceConfig.getInstanceName(), instanceConfig.getPort(), expectedHealthCheckInterval,
-                logConfigFile, false, null, null);
+                logConfigFile, secretsProviderClassName, secretsProviderConfig, false, null, null);
     }
 
     /**
@@ -258,6 +270,7 @@ private void startProcess() {
         deathException = null;
         try {
             ProcessBuilder processBuilder = new ProcessBuilder(processArgs).inheritIO();
+            secretsProviderConfigurator.configureProcessRuntimeSecretsProvider(processBuilder, instanceConfig.getFunctionDetails());
             log.info("ProcessBuilder starting the process with args {}", String.join(" ", processBuilder.command()));
             process = processBuilder.start();
         } catch (Exception ex) {
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
index 78b069cb69..41a28fbd1d 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ProcessRuntimeFactory.java
@@ -24,6 +24,7 @@
 
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheEntry;
 
 import java.nio.file.Paths;
@@ -37,6 +38,7 @@
     private final String pulsarServiceUrl;
     private final String stateStorageServiceUrl;
     private AuthenticationConfig authConfig;
+    private SecretsProviderConfigurator secretsProviderConfigurator;
     private String javaInstanceJarFile;
     private String pythonInstanceFile;
     private String logDirectory;
@@ -47,10 +49,12 @@ public ProcessRuntimeFactory(String pulsarServiceUrl,
                                  AuthenticationConfig authConfig,
                                  String javaInstanceJarFile,
                                  String pythonInstanceFile,
-                                 String logDirectory) {
+                                 String logDirectory,
+                                 SecretsProviderConfigurator secretsProviderConfigurator) {
         this.pulsarServiceUrl = pulsarServiceUrl;
         this.stateStorageServiceUrl = stateStorageServiceUrl;
         this.authConfig = authConfig;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.javaInstanceJarFile = javaInstanceJarFile;
         this.pythonInstanceFile = pythonInstanceFile;
         this.logDirectory = logDirectory;
@@ -113,6 +117,7 @@ public ProcessRuntime createContainer(InstanceConfig instanceConfig, String code
             pulsarServiceUrl,
             stateStorageServiceUrl,
             authConfig,
+            secretsProviderConfigurator,
             expectedHealthCheckInterval);
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 26475369dd..f12222ef45 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -21,6 +21,7 @@
 
 import com.google.protobuf.util.JsonFormat;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
@@ -49,6 +50,8 @@
                                            Integer grpcPort,
                                            Long expectedHealthCheckInterval,
                                            String logConfigFile,
+                                           String secretsProviderClassName,
+                                           String secretsProviderConfig,
                                            Boolean installUserCodeDepdendencies,
                                            String pythonDependencyRepository,
                                            String pythonExtraDependencyRepository) throws Exception {
@@ -150,6 +153,13 @@
         }
         args.add("--expected_healthcheck_interval");
         args.add(String.valueOf(expectedHealthCheckInterval));
+
+        args.add("--secrets_provider");
+        args.add(secretsProviderClassName);
+        if (!StringUtils.isEmpty(secretsProviderConfig)) {
+            args.add("--secrets_provider_config");
+            args.add(secretsProviderConfig);
+        }
         return args;
     }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
index 9dafbe93bb..5e42c52ea2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntime.java
@@ -28,6 +28,7 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.InstanceCommunication;
 import org.apache.pulsar.functions.proto.InstanceCommunication.FunctionStatus;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.instance.JavaInstanceRunnable;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
@@ -51,7 +52,8 @@
                   ThreadGroup threadGroup,
                   String jarFile,
                   PulsarClient pulsarClient,
-                  String stateStorageServiceUrl) {
+                  String stateStorageServiceUrl,
+                  SecretsProvider secretsProvider) {
         this.instanceConfig = instanceConfig;
         if (instanceConfig.getFunctionDetails().getRuntime() != Function.FunctionDetails.Runtime.JAVA) {
             throw new RuntimeException("Thread Container only supports Java Runtime");
@@ -61,7 +63,8 @@
             fnCache,
             jarFile,
             pulsarClient,
-            stateStorageServiceUrl);
+            stateStorageServiceUrl,
+            secretsProvider);
         this.threadGroup = threadGroup;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
index dfbbb64efa..846028d40a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/ThreadRuntimeFactory.java
@@ -28,9 +28,9 @@
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
-import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 
@@ -44,15 +44,18 @@
     private final FunctionCacheManager fnCache;
     private final PulsarClient pulsarClient;
     private final String storageServiceUrl;
+    private final SecretsProvider secretsProvider;
     private volatile boolean closed;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
-            AuthenticationConfig authConfig) throws Exception {
-        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl);
+                                AuthenticationConfig authConfig, SecretsProvider secretsProvider) throws Exception {
+        this(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig), storageServiceUrl, secretsProvider);
     }
 
     @VisibleForTesting
-    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl) {
+    public ThreadRuntimeFactory(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
+                                SecretsProvider secretsProvider) {
+        this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl();
         this.threadGroup = new ThreadGroup(threadGroupName);
         this.pulsarClient = pulsarClient;
@@ -90,7 +93,8 @@ public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFi
             threadGroup,
             jarFile,
             pulsarClient,
-            storageServiceUrl);
+            storageServiceUrl,
+            secretsProvider);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index 02886604c5..5c05de2cae 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -24,6 +24,7 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -73,8 +74,8 @@ public KubernetesRuntimeTest() throws Exception {
         this.logDirectory = "logs/functions";
         this.factory = spy(new KubernetesRuntimeFactory(null, null, null, pulsarRootDir,
             false, true, "myrepo", "anotherrepo",
-                null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null,
-                null, null, null));
+                null, pulsarServiceUrl, pulsarAdminUrl, stateStorageServiceUrl, null, null,
+                null, null, new DefaultSecretsProviderConfigurator()));
         doNothing().when(this.factory).setupClient();
     }
 
@@ -123,7 +124,7 @@ public void testJavaConstructor() throws Exception {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 30);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.yml "
@@ -137,7 +138,8 @@ public void testJavaConstructor() throws Exception {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1"
+                + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -147,7 +149,7 @@ public void testPythonConstructor() throws Exception {
 
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 32);
+        assertEquals(args.size(), 34);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + pulsarRootDir + "/" + userJarFile
                 + " --logging_directory " + logDirectory
@@ -162,7 +164,8 @@ public void testPythonConstructor() throws Exception {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(29)
-                + " --expected_healthcheck_interval -1";
+                + " --expected_healthcheck_interval -1"
+                + " --secrets_provider secretsprovider.ClearTextSecretsProvider";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
index fe492b859a..c2f3638e65 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/ProcessRuntimeTest.java
@@ -21,16 +21,22 @@
 
 import static org.testng.Assert.assertEquals;
 
+import com.google.gson.reflect.TypeToken;
 import com.google.protobuf.util.JsonFormat;
 
+import java.lang.reflect.Type;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import io.kubernetes.client.models.V1Container;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.ConsumerSpec;
 import org.apache.pulsar.functions.proto.Function.FunctionDetails;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionDetailsUtils;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
@@ -40,6 +46,48 @@
  */
 public class ProcessRuntimeTest {
 
+    class TestSecretsProviderConfigurator implements SecretsProviderConfigurator {
+
+        @Override
+        public void init(Map<String, String> config) {
+
+        }
+
+        @Override
+        public String getSecretsProviderClassName(FunctionDetails functionDetails) {
+            if (functionDetails.getRuntime() == FunctionDetails.Runtime.JAVA) {
+                return ClearTextSecretsProvider.class.getName();
+            } else {
+                return "secretsprovider.ClearTextSecretsProvider";
+            }
+        }
+
+        @Override
+        public Map<String, String> getSecretsProviderConfig(FunctionDetails functionDetails) {
+            Map<String, String> config = new HashMap<>();
+            config.put("Config", "Value");
+            return config;
+        }
+
+        @Override
+        public void configureKubernetesRuntimeSecretsProvider(V1Container container, FunctionDetails functionDetails) {
+        }
+
+        @Override
+        public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, FunctionDetails functionDetails) {
+        }
+
+        @Override
+        public Type getSecretObjectType() {
+            return TypeToken.get(String.class).getType();
+        }
+
+        @Override
+        public void validateSecretMap(Map<String, Object> secretMap) {
+
+        }
+    }
+
     private static final String TEST_TENANT = "test-function-tenant";
     private static final String TEST_NAMESPACE = "test-function-namespace";
     private static final String TEST_NAME = "test-function-container";
@@ -67,7 +115,8 @@ public ProcessRuntimeTest() {
         this.stateStorageServiceUrl = "bk://localhost:4181";
         this.logDirectory = "Users/user/logs";
         this.factory = new ProcessRuntimeFactory(
-            pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory);
+            pulsarServiceUrl, stateStorageServiceUrl, null, javaInstanceJarFile, pythonInstanceFile, logDirectory,
+                new TestSecretsProviderConfigurator());
     }
 
     @AfterMethod
@@ -115,7 +164,7 @@ public void testJavaConstructor() throws Exception {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 28);
+        assertEquals(args.size(), 32);
         String expectedArgs = "java -cp " + javaInstanceJarFile
                 + " -Dpulsar.functions.java.instance.jar=" + javaInstanceJarFile
                 + " -Dlog4j.configurationFile=java_instance_log4j2.yml "
@@ -129,7 +178,9 @@ public void testJavaConstructor() throws Exception {
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
                 + " --state_storage_serviceurl " + stateStorageServiceUrl
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval 30"
+                + " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+                + " --secrets_provider_config {\"Config\":\"Value\"}";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
@@ -139,7 +190,7 @@ public void testPythonConstructor() throws Exception {
 
         ProcessRuntime container = factory.createContainer(config, userJarFile, null, 30l);
         List<String> args = container.getProcessArgs();
-        assertEquals(args.size(), 26);
+        assertEquals(args.size(), 30);
         String expectedArgs = "python " + pythonInstanceFile
                 + " --py " + userJarFile + " --logging_directory "
                 + logDirectory + "/functions" + " --logging_file " + config.getFunctionDetails().getName()
@@ -149,7 +200,9 @@ public void testPythonConstructor() throws Exception {
                 + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
                 + "' --pulsar_serviceurl " + pulsarServiceUrl
                 + " --max_buffered_tuples 1024 --port " + args.get(23)
-                + " --expected_healthcheck_interval 30";
+                + " --expected_healthcheck_interval 30"
+                + " --secrets_provider secretsprovider.ClearTextSecretsProvider"
+                + " --secrets_provider_config {\"Config\":\"Value\"}";
         assertEquals(String.join(" ", args), expectedArgs);
     }
 
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index d8bf252905..100c7c840f 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -54,6 +54,10 @@
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.functions.runtime.Runtime;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
 
 /**
  * This class managers all aspects of functions assignments and running of function assignments for this worker
@@ -104,6 +108,14 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
         this.workerService = workerService;
         this.functionAdmin = workerService.getFunctionAdmin();
 
+        SecretsProviderConfigurator secretsProviderConfigurator;
+        if (!StringUtils.isEmpty(workerConfig.getSecretsProviderConfiguratorClassName())) {
+            secretsProviderConfigurator = (SecretsProviderConfigurator) Reflections.createInstance(workerConfig.getSecretsProviderConfiguratorClassName(), ClassLoader.getSystemClassLoader());
+        } else {
+            secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
+        }
+        secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
+
         AuthenticationConfig authConfig = AuthenticationConfig.builder()
                 .clientAuthenticationPlugin(workerConfig.getClientAuthenticationPlugin())
                 .clientAuthenticationParameters(workerConfig.getClientAuthenticationParameters())
@@ -116,7 +128,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     workerConfig.getThreadContainerFactory().getThreadGroupName(),
                     workerConfig.getPulsarServiceUrl(),
                     workerConfig.getStateStorageServiceUrl(),
-                    authConfig);
+                    authConfig,
+                    new ClearTextSecretsProvider());
         } else if (workerConfig.getProcessContainerFactory() != null) {
             this.runtimeFactory = new ProcessRuntimeFactory(
                     workerConfig.getPulsarServiceUrl(),
@@ -124,7 +137,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     authConfig,
                     workerConfig.getProcessContainerFactory().getJavaInstanceJarLocation(),
                     workerConfig.getProcessContainerFactory().getPythonInstanceLocation(),
-                    workerConfig.getProcessContainerFactory().getLogDirectory());
+                    workerConfig.getProcessContainerFactory().getLogDirectory(),
+                    secretsProviderConfigurator);
         } else if (workerConfig.getKubernetesContainerFactory() != null){
             this.runtimeFactory = new KubernetesRuntimeFactory(
                     workerConfig.getKubernetesContainerFactory().getK8Uri(),
@@ -142,7 +156,8 @@ public FunctionRuntimeManager(WorkerConfig workerConfig, WorkerService workerSer
                     authConfig,
                     workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval() == null ? -1 : workerConfig.getKubernetesContainerFactory().getExpectedMetricsCollectionInterval(),
                     workerConfig.getKubernetesContainerFactory().getChangeConfigMap(),
-                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace());
+                    workerConfig.getKubernetesContainerFactory().getChangeConfigMapNamespace(),
+                    secretsProviderConfigurator);
         } else {
             throw new RuntimeException("Either Thread, Process or Kubernetes Container Factory need to be set");
         }
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
index 40ae567767..5a95a005ec 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/WorkerConfig.java
@@ -152,6 +152,12 @@
     }
     private KubernetesContainerFactory kubernetesContainerFactory;
 
+    // The classname of the secrets provider configurator.
+    private String secretsProviderConfiguratorClassName;
+    // Any config the secret provider configurator might need. This is passed on
+    // to the init method of the secretproviderconfigurator
+    private Map<String, String> secretsProviderConfiguratorConfig;
+
     public String getFunctionMetadataTopic() {
         return String.format("persistent://%s/%s", pulsarFunctionsNamespace, functionMetadataTopicName);
     }
@@ -199,4 +205,4 @@ public static String unsafeLocalhostResolve() {
     public void setProperties(Properties properties) {
         this.properties = properties;
     }
-}
+}
\ No newline at end of file
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
index 0b8d177c70..945a0ccf1d 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/SchedulerManagerTest.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.functions.proto.Function;
 import org.apache.pulsar.functions.proto.Function.Assignment;
 import org.apache.pulsar.functions.runtime.ThreadRuntimeFactory;
+import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.worker.scheduler.RoundRobinScheduler;
 import org.mockito.Mockito;
 import org.mockito.invocation.Invocation;
@@ -80,7 +81,7 @@
     private ScheduledExecutorService executor;
 
     @BeforeMethod
-    public void setup() throws PulsarClientException {
+    public void setup() {
         WorkerConfig workerConfig = new WorkerConfig();
         workerConfig.setWorkerId("worker-1");
         workerConfig.setThreadContainerFactory(new WorkerConfig.ThreadContainerFactory().setThreadGroupName("test"));
@@ -140,7 +141,7 @@ public void testSchedule() throws Exception {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -186,7 +187,7 @@ public void testNothingNewToSchedule() throws Exception {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -233,7 +234,7 @@ public void testAddingFunctions() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -293,7 +294,7 @@ public void testDeletingFunctions() throws Exception {
         functionMetaDataList.add(function1);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -358,7 +359,7 @@ public void testScalingUp() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -469,7 +470,7 @@ public void testScalingDown() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -596,7 +597,7 @@ public void testHeartbeatFunction() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         Map<String, Map<String, Function.Assignment>> currentAssignments = new HashMap<>();
@@ -650,7 +651,7 @@ public void testUpdate() throws Exception {
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments
@@ -783,7 +784,7 @@ public void testAssignmentWorkerDoesNotExist() throws InterruptedException, NoSu
         functionMetaDataList.add(function2);
         doReturn(functionMetaDataList).when(functionMetaDataManager).getAllFunctionMetaData();
 
-        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy");
+        ThreadRuntimeFactory factory = new ThreadRuntimeFactory("dummy", null, "dummy", new ClearTextSecretsProvider());
         doReturn(factory).when(functionRuntimeManager).getRuntimeFactory();
 
         // set assignments


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services