You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by eo...@apache.org on 2022/09/13 07:04:38 UTC
[pulsar] branch master updated: [fix][functions] Fix K8S download function method with auth enabled (#17597)
This is an automated email from the ASF dual-hosted git repository.
eolivelli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new d23b7604d5a [fix][functions] Fix K8S download function method with auth enabled (#17597)
d23b7604d5a is described below
commit d23b7604d5ad919e93f73c044de04f6c58b702ae
Author: Nicolò Boschi <bo...@gmail.com>
AuthorDate: Tue Sep 13 09:04:30 2022 +0200
[fix][functions] Fix K8S download function method with auth enabled (#17597)
---
.../runtime/kubernetes/KubernetesRuntime.java | 24 +++----
.../runtime/kubernetes/KubernetesRuntimeTest.java | 74 ++++++++++++++++++++--
2 files changed, 83 insertions(+), 15 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index f7791e716d9..e6e85d66d0e 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -875,17 +875,7 @@ public class KubernetesRuntime implements Runtime {
ArrayList<String> cmd = new ArrayList<>(Arrays.asList(
pulsarRootDir + configAdminCLI,
"--admin-url",
- pulsarAdminUrl,
- "functions",
- "download",
- "--tenant",
- tenant,
- "--namespace",
- namespace,
- "--name",
- name,
- "--destination-file",
- userCodeFilePath));
+ pulsarAdminUrl));
// add auth plugin and parameters if necessary
if (authenticationEnabled && authConfig != null) {
@@ -900,6 +890,18 @@ public class KubernetesRuntime implements Runtime {
}
}
+ cmd.addAll(Arrays.asList(
+ "functions",
+ "download",
+ "--tenant",
+ tenant,
+ "--namespace",
+ namespace,
+ "--name",
+ name,
+ "--destination-file",
+ userCodeFilePath));
+
if (transformFunction) {
cmd.add("--transform-function");
}
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 33451a316e4..cf86623eafd 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -64,6 +64,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.function.Consumer;
import java.util.stream.Collectors;
import static org.apache.pulsar.functions.runtime.RuntimeUtils.FUNCTIONS_INSTANCE_CLASSPATH;
@@ -183,10 +184,21 @@ public class KubernetesRuntimeTest {
}
}
+
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer,
String downloadDirectory) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio,
+ manifestCustomizer, downloadDirectory, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+ double cpuOverCommitRatio, double memoryOverCommitRatio,
+ Optional<RuntimeCustomizer> manifestCustomizer,
+ String downloadDirectory,
+ Consumer<WorkerConfig> workerConfigConsumer,
+ AuthenticationConfig authenticationConfig) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
doNothing().when(factory).setupClient();
@@ -226,7 +238,11 @@ public class KubernetesRuntimeTest {
manifestCustomizer.ifPresent(runtimeCustomizer -> runtimeCustomizer.initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap())));
- factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
+ if (workerConfigConsumer != null) {
+ workerConfigConsumer.accept(workerConfig);
+ }
+
+ factory.initialize(workerConfig, authenticationConfig, new TestSecretProviderConfigurator(), Mockito.mock(ConnectorsManager.class),
Mockito.mock(FunctionsManager.class), Optional.empty(), manifestCustomizer);
return factory;
}
@@ -234,7 +250,16 @@ public class KubernetesRuntimeTest {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio,
Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
- return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer, null);
+ return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer,
+ null, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+ double cpuOverCommitRatio, double memoryOverCommitRatio,
+ Optional<RuntimeCustomizer> manifestCustomizer,
+ Consumer<WorkerConfig> workerConfigConsumer) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer,
+ null, workerConfigConsumer, null);
}
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
@@ -810,6 +835,33 @@ public class KubernetesRuntimeTest {
assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
}
+ @Test
+ public void testCustomKubernetesDownloadCommandsWithAuth() throws Exception {
+ InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+ config.setFunctionAuthenticationSpec(Function.FunctionAuthenticationSpec.newBuilder().build());
+ config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false));
+
+ factory = createKubernetesRuntimeFactory(null,
+ 10, 1.0, 1.0, Optional.empty(), null, wconfig -> {
+ wconfig.setAuthenticationEnabled(true);
+ }, AuthenticationConfig.builder()
+ .clientAuthenticationPlugin("com.MyAuth")
+ .clientAuthenticationParameters("{\"authParam1\": \"authParamValue1\"}")
+ .build());
+
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, null, null, 30l);
+ V1StatefulSet spec = container.createStatefulSet();
+ String expectedDownloadCommand = "pulsar-admin --admin-url " + pulsarAdminUrl
+ + " --auth-plugin com.MyAuth --auth-params {\"authParam1\": \"authParamValue1\"}"
+ + " functions download "
+ + "--tenant " + TEST_TENANT
+ + " --namespace " + TEST_NAMESPACE
+ + " --name " + TEST_NAME
+ + " --destination-file " + pulsarRootDir + "/" + userJarFile;
+ String containerCommand = spec.getSpec().getTemplate().getSpec().getContainers().get(0).getCommand().get(2);
+ assertTrue(containerCommand.contains(expectedDownloadCommand), "Found:" + containerCommand);
+ }
+
InstanceConfig createGolangInstanceConfig() {
InstanceConfig config = new InstanceConfig();
@@ -915,6 +967,16 @@ public class KubernetesRuntimeTest {
double cpuOverCommitRatio, double memoryOverCommitRatio,
String manifestCustomizerClassName,
Map<String, Object> runtimeCustomizerConfig) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio,
+ memoryOverCommitRatio, manifestCustomizerClassName, runtimeCustomizerConfig, null, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+ double cpuOverCommitRatio, double memoryOverCommitRatio,
+ String manifestCustomizerClassName,
+ Map<String, Object> runtimeCustomizerConfig,
+ Consumer<WorkerConfig> workerConfigConsumer,
+ AuthenticationConfig authenticationConfig) throws Exception {
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
doNothing().when(factory).setupClient();
@@ -957,8 +1019,11 @@ public class KubernetesRuntimeTest {
manifestCustomizer = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
}
+ if (workerConfigConsumer != null) {
+ workerConfigConsumer.accept(workerConfig);
+ }
- factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(),
+ factory.initialize(workerConfig, authenticationConfig, new TestSecretProviderConfigurator(),
Mockito.mock(ConnectorsManager.class), Mockito.mock(FunctionsManager.class), Optional.empty(), manifestCustomizer);
return factory;
}
@@ -1094,7 +1159,8 @@ public class KubernetesRuntimeTest {
public void testJavaConstructorWithoutDownloadDirectoryDefined() throws Exception {
InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
- factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), null);
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0,
+ Optional.empty());
verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
}