You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by je...@apache.org on 2020/05/18 05:22:15 UTC
[pulsar] branch master updated: Modify ThreadRuntime to allow
custom secrets providers (#6971)
This is an automated email from the ASF dual-hosted git repository.
jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 7a76954 Modify ThreadRuntime to allow custom secrets providers (#6971)
7a76954 is described below
commit 7a76954abf2acb8eec9a40dc932b4c95896296f3
Author: Boyang Jerry Peng <je...@gmail.com>
AuthorDate: Sun May 17 22:22:03 2020 -0700
Modify ThreadRuntime to allow custom secrets providers (#6971)
Co-authored-by: Jerry Peng <je...@splunk.com>
---
.../runtime/thread/ThreadRuntimeFactory.java | 23 +++++++++++++++++-----
.../functions/worker/FunctionRuntimeManager.java | 2 ++
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
index c675418..cb82129 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/thread/ThreadRuntimeFactory.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.functions.runtime.RuntimeUtils;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
+import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManager;
import org.apache.pulsar.functions.utils.functioncache.FunctionCacheManagerImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
@@ -61,13 +62,15 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
private CollectorRegistry collectorRegistry;
private String narExtractionDirectory;
private volatile boolean closed;
+ private SecretsProviderConfigurator secretsProviderConfigurator;
+ private ClassLoader rootClassLoader;
public ThreadRuntimeFactory(String threadGroupName, String pulsarServiceUrl, String storageServiceUrl,
AuthenticationConfig authConfig, SecretsProvider secretsProvider,
CollectorRegistry collectorRegistry, String narExtractionDirectory,
ClassLoader rootClassLoader) throws Exception {
initialize(threadGroupName, createPulsarClient(pulsarServiceUrl, authConfig),
- storageServiceUrl, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
+ storageServiceUrl, null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
@VisibleForTesting
@@ -76,7 +79,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
String narExtractionDirectory, ClassLoader rootClassLoader) {
initialize(threadGroupName, pulsarClient, storageServiceUrl,
- secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
+ null, secretsProvider, collectorRegistry, narExtractionDirectory, rootClassLoader);
}
private static PulsarClient createPulsarClient(String pulsarServiceUrl, AuthenticationConfig authConfig)
@@ -101,12 +104,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
}
private void initialize(String threadGroupName, PulsarClient pulsarClient, String storageServiceUrl,
- SecretsProvider secretsProvider, CollectorRegistry collectorRegistry,
- String narExtractionDirectory, ClassLoader rootClassLoader) {
+ SecretsProviderConfigurator secretsProviderConfigurator, SecretsProvider secretsProvider,
+ CollectorRegistry collectorRegistry, String narExtractionDirectory, ClassLoader rootClassLoader) {
if (rootClassLoader == null) {
rootClassLoader = Thread.currentThread().getContextClassLoader();
}
+ this.rootClassLoader = rootClassLoader;
+ this.secretsProviderConfigurator = secretsProviderConfigurator;
this.secretsProvider = secretsProvider;
this.fnCache = new FunctionCacheManagerImpl(rootClassLoader);
this.threadGroup = new ThreadGroup(threadGroupName);
@@ -126,7 +131,7 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
initialize(factoryConfig.getThreadGroupName(),
createPulsarClient(workerConfig.getPulsarServiceUrl(), authenticationConfig),
- workerConfig.getStateStorageServiceUrl(), new ClearTextSecretsProvider(),
+ workerConfig.getStateStorageServiceUrl(), secretsProviderConfigurator, null,
null, workerConfig.getNarExtractionDirectory(), null);
}
@@ -134,6 +139,14 @@ public class ThreadRuntimeFactory implements RuntimeFactory {
public ThreadRuntime createContainer(InstanceConfig instanceConfig, String jarFile,
String originalCodeFileName,
Long expectedHealthCheckInterval) {
+ if (secretsProvider == null) {
+ String secretsProviderClassName = secretsProviderConfigurator.getSecretsProviderClassName(instanceConfig.getFunctionDetails());
+ secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, this.rootClassLoader);
+ log.info("Initializing secrets provider {} with configs: {}",
+ secretsProvider.getClass().getName(), secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+ secretsProvider.init(secretsProviderConfigurator.getSecretsProviderConfig(instanceConfig.getFunctionDetails()));
+ }
+
return new ThreadRuntime(
instanceConfig,
fnCache,
diff --git a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
index c66be68..3f522dd 100644
--- a/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
+++ b/pulsar-functions/worker/src/main/java/org/apache/pulsar/functions/worker/FunctionRuntimeManager.java
@@ -143,6 +143,8 @@ public class FunctionRuntimeManager implements AutoCloseable{
} else {
secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
}
+ log.info("Initializing secrets provider configurator {} with configs: {}",
+ secretsProviderConfigurator.getClass().getName(), workerConfig.getSecretsProviderConfiguratorConfig());
secretsProviderConfigurator.init(workerConfig.getSecretsProviderConfiguratorConfig());
Optional<FunctionAuthProvider> functionAuthProvider = Optional.empty();