You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/13 07:04:38 UTC

[pulsar] branch master updated: [fix][functions] Fix K8S download function method with auth enabled (#17597)

This is an automated email from the ASF dual-hosted git repository.

eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new d23b7604d5a [fix][functions] Fix K8S download function method with auth enabled (#17597)
d23b7604d5a is described below

commit d23b7604d5ad919e93f73c044de04f6c58b702ae
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Sep 13 09:04:30 2022 +0200

    [fix][functions] Fix K8S download function method with auth enabled (#17597)
---
 .../runtime/kubernetes/KubernetesRuntime.java      | 24 +++----
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 74 ++++++++++++++++++++--
 2 files changed, 83 insertions(+), 15 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index f7791e716d9..e6e85d66d0e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -875,17 +875,7 @@ public class KubernetesRuntime implements Runtime {
         ArrayList<String> cmd = new ArrayList<>(Arrays.asList(
                 pulsarRootDir + configAdminCLI,
                 "--admin-url",
-                pulsarAdminUrl,
-                "functions",
-                "download",
-                "--tenant",
-                tenant,
-                "--namespace",
-                namespace,
-                "--name",
-                name,
-                "--destination-file",
-                userCodeFilePath));
+                pulsarAdminUrl));
 
         // add auth plugin and parameters if necessary
         if (authenticationEnabled && authConfig != null) {
@@ -900,6 +890,18 @@ public class KubernetesRuntime implements Runtime {
             }
         }
 
+        cmd.addAll(Arrays.asList(
+                "functions",
+                "download",
+                "--tenant",
+                tenant,
+                "--namespace",
+                namespace,
+                "--name",
+                name,
+                "--destination-file",
+                userCodeFilePath));
+
         if (transformFunction) {
             cmd.add("--transform-function");
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 33451a316e4..cf86623eafd 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
 import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
@@ -183,10 +184,21 @@ public class KubernetesRuntimeTest {
         }
     }
 
+
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio,
                                                             Optional<RuntimeCustomizer> manifestCustomizer,
                                                             String downloadDirectory) throws Exception {
+        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
+                manifestCustomizer, downloadDirectory, null, null);
+    }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+                                                            double cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            Optional<RuntimeCustomizer> manifestCustomizer,
+                                                            String downloadDirectory,
+                                                            Consumer<WorkerConfig> workerConfigConsumer,
+                                                            AuthenticationConfig authenticationConfig) throws Exception {
 
         KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
         doNothing().when(factory).setupClient();
@@ -226,7 +238,11 @@ public class KubernetesRuntimeTest {
 
         manifestCustomizer.ifPresent(runtimeCustomizer -> runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
 
-        factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
+        if (workerConfigConsumer != null) {
+            workerConfigConsumer.accept(workerConfig);
+        }
+
+        factory.initialize(workerConfig, authenticationConfig, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
                 Mockito.mock(FunctionsManager.class), Optional.empty(), manifestCustomizer);
         return factory;
     }
@@ -234,7 +250,16 @@ public class KubernetesRuntimeTest {
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio,
                                                             Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
-        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer, null);
+        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer,
+                null, null, null);
+    }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+                                                            double cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            Optional<RuntimeCustomizer> manifestCustomizer,
+                                                            Consumer<WorkerConfig> workerConfigConsumer) throws Exception {
+        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer,
+                null, workerConfigConsumer, null);
     }
 
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
@@ -810,6 +835,33 @@ public class KubernetesRuntimeTest {
         assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
     }
 
+    @Test
+    public void testCustomKubernetesDownloadCommandsWithAuth() throws Exception {
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+        config.setFunctionAuthenticationSpec(Function.FunctionAuthenticationSpec.newBuilder().build());
+        config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false));
+
+        factory = createKubernetesRuntimeFactory(null,
+                10, 1.0, 1.0, Optional.empty(), null, wconfig -> {
+                    wconfig.setAuthenticationEnabled(true);
+                }, AuthenticationConfig.builder()
+                        .clientAuthenticationPlugin("com.MyAuth")
+                        .clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}")
+                        .build());
+
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
+        V1StatefulSet spec = container.createStatefulSet();
+        String expectedDownloadCommand = "pulsar-admin --admin-url " + pulsarAdminUrl
+                + " --auth-plugin com.MyAuth --auth-params {\"authParam1\": \"authParamValue1\"}"
+                + " functions download "
+                + "--tenant " + TEST_TENANT
+                + " --namespace " + TEST_NAMESPACE
+                + " --name " + TEST_NAME
+                + " --destination-file " + pulsarRootDir + "/" + userJarFile;
+        String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
+        assertTrue(containerCommand.contains(expectedDownloadCommand), "Found:" + containerCommand);
+    }
+
     InstanceConfig createGolangInstanceConfig() {
         InstanceConfig config = new InstanceConfig();
 
@@ -915,6 +967,16 @@ public class KubernetesRuntimeTest {
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio,
                                                             String manifestCustomizerClassName,
                                                             Map<String, Object> runtimeCustomizerConfig) throws Exception {
+        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio,
+                memoryOverCommitRatio, manifestCustomizerClassName, runtimeCustomizerConfig, null, null);
+    }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+                                                            double cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            String manifestCustomizerClassName,
+                                                            Map<String, Object> runtimeCustomizerConfig,
+                                                            Consumer<WorkerConfig> workerConfigConsumer,
+                                                            AuthenticationConfig authenticationConfig) throws Exception {
         KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
         doNothing().when(factory).setupClient();
 
@@ -957,8 +1019,11 @@ public class KubernetesRuntimeTest {
             manifestCustomizer = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
             manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
         }
+        if (workerConfigConsumer != null) {
+            workerConfigConsumer.accept(workerConfig);
+        }
 
-        factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(),
+        factory.initialize(workerConfig, authenticationConfig, new TestSecretProviderConfigurator(),
                 Mockito.mock(ConnectorsManager.class), Mockito.mock(FunctionsManager.class), Optional.empty(), manifestCustomizer);
         return factory;
     }
@@ -1094,7 +1159,8 @@ public class KubernetesRuntimeTest {
     public void testJavaConstructorWithoutDownloadDirectoryDefined() throws Exception {
         InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
 
-        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), null);
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+                Optional.empty());
 
         verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
     }