You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by sa...@apache.org on 2020/09/23 14:32:59 UTC

[pulsar] branch master updated: Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)

This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new fefaf52  Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)
fefaf52 is described below

commit fefaf52a30e89f82079f095e4bddad39f6c17e52
Author: Sanjeev Kulkarni <sa...@gmail.com>
AuthorDate: Wed Sep 23 07:32:37 2020 -0700

    Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner (#8098)
    
    * Add ability to specify EnvironmentBasedSecretsProvider in LocalRunner
    
    * Addressed feedback
    
    * Took out wildcard
    
    * Fix build
    
    * Address feedback
    
    Co-authored-by: Sanjeev Kulkarni <sa...@splunk.com>
---
 .../org/apache/pulsar/admin/cli/CmdFunctions.java  |  4 ++
 .../java/org/apache/pulsar/admin/cli/CmdSinks.java |  5 ++
 .../org/apache/pulsar/admin/cli/CmdSources.java    |  5 ++
 .../org/apache/pulsar/functions/LocalRunner.java   | 43 +++++++++++++-
 ...eAndConfigBasedSecretsProviderConfigurator.java | 66 ++++++++++++++++++++++
 5 files changed, 120 insertions(+), 3 deletions(-)

diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
index d28bd69..4792a6f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdFunctions.java
@@ -627,6 +627,10 @@ public class CmdFunctions extends CmdBase {
         protected Integer instanceIdOffset = 0;
         @Parameter(names = "--runtime", description = "either THREAD or PROCESS. Only applies for Java functions")
         protected String runtime;
+        @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+        protected String secretsProviderClassName;
+        @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+        protected String secretsProviderConfig;
 
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_stateStorageServiceUrl)) stateStorageServiceUrl = DEPRECATED_stateStorageServiceUrl;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
index 04bbf94..f6b1823 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSinks.java
@@ -164,6 +164,11 @@ public class CmdSinks extends CmdBase {
         @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path")
         protected String tlsTrustCertFilePath;
 
+        @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+        protected String secretsProviderClassName;
+        @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+        protected String secretsProviderConfig;
+
         private void mergeArgs() {
             if (!StringUtils.isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
             if (!StringUtils.isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
index ea61202..9bd49be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdSources.java
@@ -165,6 +165,11 @@ public class CmdSources extends CmdBase {
         @Parameter(names = "--tls-trust-cert-path", description = "tls trust cert file path")
         protected String tlsTrustCertFilePath;
 
+        @Parameter(names = "--secrets-provider-classname", description = "Whats the classname for secrets provider")
+        protected String secretsProviderClassName;
+        @Parameter(names = "--secrets-provider-config", description = "Config that needs to be passed to secrets provider")
+        protected String secretsProviderConfig;
+
         private void mergeArgs() {
             if (!isBlank(DEPRECATED_brokerServiceUrl)) brokerServiceUrl = DEPRECATED_brokerServiceUrl;
             if (!isBlank(DEPRECATED_clientAuthPlugin)) clientAuthPlugin = DEPRECATED_clientAuthPlugin;
diff --git a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
index e7296ca..e00affc 100644
--- a/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
+++ b/pulsar-functions/localrun/src/main/java/org/apache/pulsar/functions/LocalRunner.java
@@ -31,6 +31,7 @@ import org.apache.pulsar.common.io.SinkConfig;
 import org.apache.pulsar.common.io.SourceConfig;
 import org.apache.pulsar.common.nar.NarClassLoader;
 import org.apache.pulsar.common.util.ObjectMapperFactory;
+import org.apache.pulsar.common.util.Reflections;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
 import org.apache.pulsar.functions.proto.Function;
@@ -38,7 +39,11 @@ import org.apache.pulsar.functions.runtime.process.ProcessRuntimeFactory;
 import org.apache.pulsar.functions.runtime.RuntimeSpawner;
 import org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory;
 import org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.EnvironmentBasedSecretsProvider;
+import org.apache.pulsar.functions.secretsprovider.SecretsProvider;
 import org.apache.pulsar.functions.secretsproviderconfigurator.DefaultSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.NameAndConfigBasedSecretsProviderConfigurator;
+import org.apache.pulsar.functions.secretsproviderconfigurator.SecretsProviderConfigurator;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 import org.apache.pulsar.functions.utils.FunctionConfigUtils;
 import org.apache.pulsar.functions.utils.SinkConfigUtils;
@@ -56,6 +61,7 @@ import java.nio.file.Paths;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
 import java.util.UUID;
@@ -143,6 +149,10 @@ public class LocalRunner {
     protected int instanceIdOffset = 0;
     @Parameter(names = "--runtime", description = "Function runtime to use (Thread/Process)", hidden = true, converter = RuntimeConverter.class)
     protected RuntimeEnv runtimeEnv;
+    @Parameter(names = "--secretsProviderClassName", description = "Whats the classname of secrets provider", hidden = true)
+    protected String secretsProviderClassName;
+    @Parameter(names = "--secretsProviderConfig", description = "Whats the config for the secrets provider", hidden = true)
+    protected String secretsProviderConfig;
 
     private static final String DEFAULT_SERVICE_URL = "pulsar://localhost:6650";
 
@@ -349,7 +359,7 @@ public class LocalRunner {
                                            int parallelism, int instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, AuthenticationConfig authConfig,
                                            String userCodeFile) throws Exception {
-
+        SecretsProviderConfigurator secretsProviderConfigurator = getSecretsProviderConfigurator();
         try (ProcessRuntimeFactory containerFactory = new ProcessRuntimeFactory(
                 serviceUrl,
                 stateStorageServiceUrl,
@@ -359,7 +369,8 @@ public class LocalRunner {
                 null, /* log directory */
                 null, /* extra dependencies dir */
                 narExtractionDirectory, /* nar extraction dir */
-                new DefaultSecretsProviderConfigurator(), false, Optional.empty(), Optional.empty())) {
+                secretsProviderConfigurator,
+                false, Optional.empty(), Optional.empty())) {
 
             for (int i = 0; i < parallelism; ++i) {
                 InstanceConfig instanceConfig = new InstanceConfig();
@@ -418,11 +429,23 @@ public class LocalRunner {
                                            int parallelism, int instanceIdOffset, String serviceUrl,
                                            String stateStorageServiceUrl, AuthenticationConfig authConfig,
                                            String userCodeFile) throws Exception {
+        SecretsProvider secretsProvider;
+        if (secretsProviderClassName != null) {
+            secretsProvider = (SecretsProvider) Reflections.createInstance(secretsProviderClassName, ClassLoader.getSystemClassLoader());
+            Map<String, String> config = null;
+            if (secretsProviderConfig != null) {
+                config = (Map<String, String>)new Gson().fromJson(secretsProviderConfig, Map.class);
+            }
+            secretsProvider.init(config);
+        } else {
+            secretsProvider = new ClearTextSecretsProvider();
+        }
         ThreadRuntimeFactory threadRuntimeFactory = new ThreadRuntimeFactory("LocalRunnerThreadGroup",
                 serviceUrl,
                 stateStorageServiceUrl,
                 authConfig,
-                new ClearTextSecretsProvider(), null, narExtractionDirectory, null);
+                secretsProvider,
+                null, narExtractionDirectory, null);
         for (int i = 0; i < parallelism; ++i) {
             InstanceConfig instanceConfig = new InstanceConfig();
             instanceConfig.setFunctionDetails(functionDetails);
@@ -482,4 +505,18 @@ public class LocalRunner {
         String connectorsDir = Paths.get(pulsarHome, "connectors").toString();
         return ConnectorUtils.searchForConnectors(connectorsDir, narExtractionDirectory);
     }
+
+    private SecretsProviderConfigurator getSecretsProviderConfigurator() {
+        SecretsProviderConfigurator secretsProviderConfigurator;
+        if (secretsProviderClassName != null) {
+            Map<String, String> config = null;
+            if (secretsProviderConfig != null) {
+                config = (Map<String, String>)new Gson().fromJson(secretsProviderConfig, Map.class);
+            }
+            secretsProviderConfigurator = new NameAndConfigBasedSecretsProviderConfigurator(secretsProviderClassName, config);
+        } else {
+            secretsProviderConfigurator = new DefaultSecretsProviderConfigurator();
+        }
+        return secretsProviderConfigurator;
+    }
 }
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java
new file mode 100644
index 0000000..9eb389f
--- /dev/null
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/NameAndConfigBasedSecretsProviderConfigurator.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.functions.secretsproviderconfigurator;
+
+import com.google.gson.reflect.TypeToken;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import org.apache.pulsar.functions.proto.Function;
+
+import java.lang.reflect.Type;
+import java.util.Map;
+
+/**
+ * This is a very simple secrets provider which wires in a given secrets provider classname/config
+ * to the function instances/containers. This does not do any special kubernetes specific wiring.
+ */
+public class NameAndConfigBasedSecretsProviderConfigurator implements SecretsProviderConfigurator {
+    private String className;
+    private Map<String, String> config;
+    public NameAndConfigBasedSecretsProviderConfigurator(String className, Map<String, String> config) {
+        this.className = className;
+        this.config = config;
+    }
+    @Override
+    public String getSecretsProviderClassName(Function.FunctionDetails functionDetails) {
+        return className;
+    }
+
+    @Override
+    public Map<String, String> getSecretsProviderConfig(Function.FunctionDetails functionDetails) {
+        return config;
+    }
+
+    // Kubernetes secrets can be exposed as volume mounts or as environment variables in the pods. We are currently using the
+    // environment variables way. Essentially the secretName/secretPath is attached as secretRef to the environment variables
+    // of a pod and kubernetes magically makes the secret pointed to by this combination available as a env variable.
+    @Override
+    public void configureKubernetesRuntimeSecretsProvider(V1PodSpec podSpec, String functionsContainerName, Function.FunctionDetails functionDetails) {
+        // noop
+    }
+
+    @Override
+    public void configureProcessRuntimeSecretsProvider(ProcessBuilder processBuilder, Function.FunctionDetails functionDetails) {
+        // noop
+    }
+
+    @Override
+    public Type getSecretObjectType() {
+        return new TypeToken<Map<String, String>>() {}.getType();
+    }
+}