You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/09/23 14:32:59 UTC
[pulsar] branch master updated: Add ability to specify
EnvironmentBasedSecretsProvider in LocalRunner (#8098)
This is an automated email from the ASF dual-hosted git repository.
sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new fefaf52 Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)
fefaf52 is described below
commit fefaf52a30e89f82079f095e4bddad39f6c17e52
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Sep 23 07:32:37 2020 -0700
Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)
* Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner
* Addressed feedback
* Took out wildcard
* Fix build
* Address feedback
Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
.../org/apache/pulsar/admin/cli/CmdFunctions.java | 4 ++
.../java/org/apache/pulsar/admin/cli/CmdSinks.java | 5 ++
.../org/apache/pulsar/admin/cli/CmdSources.java | 5 ++
.../org/apache/pulsar/functions/LocalRunner.java | 43 +++++++++++++-
...eAndConfigBasedSecretsProviderConfigurator.java | 66 ++++++++++++++++++++++
5 files changed, 120 insertions(+), 3 deletions(-)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index d28bd69..4792a6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -627,6 +627,10 @@ public class CmdFunctions extends CmdBase {
protected Integer instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "either THREAD or PROCESS. Only applies for Java functions")
protected String runtime;
+ @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+ protected String secretsProviderClassName;
+ @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+ protected String secretsProviderConfig;
private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_stateStorageServiceUrl)) stateStorageServiceUrl = DEPRECATED_stateStorageServiceUrl;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 04bbf94..f6b1823 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -164,6 +164,11 @@ public class CmdSinks extends CmdBase {
@Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;
+ @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+ protected String secretsProviderClassName;
+ @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+ protected String secretsProviderConfig;
+
private void mergeArgs() {
if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index ea61202..9bd49be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -165,6 +165,11 @@ public class CmdSources extends CmdBase {
@Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path")
protected String tlsTrustCertFilePath;
+ @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+ protected String secretsProviderClassName;
+ @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+ protected String secretsProviderConfig;
+
private void mergeArgs() {
if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index e7296ca..e00affc 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.common.io.SourceConfig;
import org.apache.pulsar.common.nar.NarClassLoader;
import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.Reflections;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
import org.apache.pulsar.functions.proto.Function;
@@ -38,7 +39,11 @@ import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
import org.apache.pulsar.functions.runtime.RuntimeSpawner;
import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
import org.apache.pulsar.functions.utils.FunctionCommon;
import org.apache.pulsar.functions.utils.FunctionConfigUtils;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
@@ -56,6 +61,7 @@ import java.nio.file.Paths;
import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
+import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
@@ -143,6 +149,10 @@ public class LocalRunner {
protected int instanceIdOffset = 0;
@Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
protected RuntimeEnv runtimeEnv;
+ @Parameter(names = "--secretsProviderClassName", description = "Whats the classname of secrets provider", hidden = true)
+ protected String secretsProviderClassName;
+ @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
+ protected String secretsProviderConfig;
private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
@@ -349,7 +359,7 @@ public class LocalRunner {
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
String userCodeFile) throws Exception {
-
+ SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
serviceUrl,
stateStorageServiceUrl,
@@ -359,7 +369,8 @@ public class LocalRunner {
null, /* log directory */
null, /* extra dependencies dir */
narExtractionDirectory, /* nar extraction dir */
- new DefaultSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty())) {
+ secretsProviderConfigurator,
+ false, Optional.empty(), Optional.empty())) {
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
@@ -418,11 +429,23 @@ public class LocalRunner {
int parallelism, int instanceIdOffset, String serviceUrl,
String stateStorageServiceUrl, AuthenticationConfig authConfig,
String userCodeFile) throws Exception {
+ SecretsProvider secretsProvider;
+ if (secretsProviderClassName != null) {
+ secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+ Map<String, String> config = null;
+ if (secretsProviderConfig != null) {
+ config = (Map<String, String>)new Gson().fromJson(secretsProviderConfig, Map.class);
+ }
+ secretsProvider.init(config);
+ } else {
+ secretsProvider = new ClearTextSecretsProvider();
+ }
ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
serviceUrl,
stateStorageServiceUrl,
authConfig,
- new ClearTextSecretsProvider(), null, narExtractionDirectory, null);
+ secretsProvider,
+ null, narExtractionDirectory, null);
for (int i = 0; i < parallelism; ++i) {
InstanceConfig instanceConfig = new InstanceConfig();
instanceConfig.setFunctionDetails(functionDetails);
@@ -482,4 +505,18 @@ public class LocalRunner {
String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
}
+
+ private SecretsProviderConfigurator getSecretsProviderConfigurator() {
+ SecretsProviderConfigurator secretsProviderConfigurator;
+ if (secretsProviderClassName != null) {
+ Map<String, String> config = null;
+ if (secretsProviderConfig != null) {
+ config = (Map<String, String>)new Gson().fromJson(secretsProviderConfig, Map.class);
+ }
+ secretsProviderConfigurator = new NameAndConfigBasedSecretsProviderConfigurator(secretsProviderClassName, config);
+ } else {
+ secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
+ }
+ return secretsProviderConfigurator;
+ }
}
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java
new file mode 100644
index 0000000..9eb389f
--- /dev/null
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.secretsproviderconfigurator;
+
+import com.google.gson.reflect.TypeToken;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * This is a very simple secrets provider which wires in a given secrets provider classname/config
+ * to the function instances/containers. This does not do any special kubernetes specific wiring.
+ */
+public class NameAndConfigBasedSecretsProviderConfigurator implements SecretsProviderConfigurator {
+ private String className;
+ private Map<String, String> config;
+ public NameAndConfigBasedSecretsProviderConfigurator(String className, Map<String, String> config) {
+ this.className = className;
+ this.config = config;
+ }
+ @Override
+ public String getSecretsProviderClassName(Function.FunctionDetails functionDetails) {
+ return className;
+ }
+
+ @Override
+ public Map<String, String> getSecretsProviderConfig(Function.FunctionDetails functionDetails) {
+ return config;
+ }
+
+ // Kubernetes secrets can be exposed as volume mounts or as environment variables in the pods. We are currently using the
+ // environment variables way. Essentially the secretName/secretPath is attached as secretRef to the environment variables
+ // of a pod and kubernetes magically makes the secret pointed to by this combination available as a env variable.
+ @Override
+ public void configureKubernetesRuntimeSecretsProvider(V1PodSpec podSpec, String functionsContainerName, Function.FunctionDetails functionDetails) {
+ // noop
+ }
+
+ @Override
+ public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, Function.FunctionDetails functionDetails) {
+ // noop
+ }
+
+ @Override
+ public Type getSecretObjectType() {
+ return new TypeToken<Map<String, String>>() {}.getType();
+ }
+}