You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/05/18 05:22:15 UTC

[pulsar] branch master updated: Modify ThreadRuntime to allow custom secrets providers (#6971)

This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a76954  Modify ThreadRuntime to allow custom secrets providers (#6971)
7a76954 is described below

commit 7a76954abf2acb8eec9a40dc932b4c95896296f3
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun May 17 22:22:03 2020 -0700

    Modify ThreadRuntime to allow custom secrets providers (#6971)
    
    Co-authored-by: Jerry Peng <je...@splunk.com>
---
 .../runtime/thread/ThreadRuntimeFactory.java       | 23 +++++++++++++++++-----
 .../functions/worker/FunctionRuntimeManager.java   |  2 ++
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index c675418..cb82129 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.functions.runtime.RuntimeUtils;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
 import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
 import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
 import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -61,13 +62,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     private CollectorRegistry collectorRegistry;
     private String narExtractionDirectory;
     private volatile boolean closed;
+    private SecretsProviderConfigurator secretsProviderConfigurator;
+    private ClassLoader rootClassLoader;
 
     public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
                                 AuthenticationConfig authConfig, SecretsProvider secretsProvider,
                                 CollectorRegistry collectorRegistry, String narExtractionDirectory,
                                 ClassLoader rootClassLoader) throws Exception {
         initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
-                storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
+                storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
     @VisibleForTesting
@@ -76,7 +79,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
                                 String narExtractionDirectory, ClassLoader rootClassLoader) {
 
         initialize(threadGroupName, pulsarClient, storageServiceUrl,
-                secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
+                null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
     }
 
     private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -101,12 +104,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     }
 
     private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
-                            SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
-                            String narExtractionDirectory, ClassLoader rootClassLoader) {
+                            SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
+                            CollectorRegistry collectorRegistry,  String narExtractionDirectory, ClassLoader rootClassLoader) {
         if (rootClassLoader == null) {
             rootClassLoader = Thread.currentThread().getContextClassLoader();
         }
 
+        this.rootClassLoader = rootClassLoader;
+        this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.secretsProvider = secretsProvider;
         this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
         this.threadGroup = new ThreadGroup(threadGroupName);
@@ -126,7 +131,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
 
         initialize(factoryConfig.getThreadGroupName(),
                 createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
-                workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
+                workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
                 null, workerConfig.getNarExtractionDirectory(), null);
     }
 
@@ -134,6 +139,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
     public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
                                          String originalCodeFileName,
                                          Long expectedHealthCheckInterval) {
+        if (secretsProvider == null) {
+            String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, this.rootClassLoader);
+            log.info("Initializing secrets provider {} with configs: {}",
+              secretsProvider.getClass().getName(), secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+            secretsProvider.init(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+        }
+
         return new ThreadRuntime(
             instanceConfig,
             fnCache,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index c66be68..3f522dd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -143,6 +143,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
         } else {
             secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
         }
+        log.info("Initializing secrets provider configurator {} with configs: {}",
+          secretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig());
         secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
 
         Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();