You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/22 07:13:39 UTC
[pulsar] branch branch-2.7 updated: [branch-2.7] Add
downloadDirectory support to function k8s runtime (#9619)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 43ab9e8 [branch-2.7] Add downloadDirectory support to function k8s runtime (#9619)
43ab9e8 is described below
commit 43ab9e8643d1a90fb3d651a5a919d9986af3c075
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Mon Feb 22 15:13:04 2021 +0800
[branch-2.7] Add downloadDirectory support to function k8s runtime (#9619)
Fixes #9315
### Motivation
per request from @codelipenghui in #9377, this pr is cherry pick #9377 to branch 2.7 and resolved conflicts.
### Modifications
- cherry pick pr
- resolve conflicts in tests
- remove `package` related code
---
.../client/admin/internal/FunctionsImpl.java | 3 +
.../runtime/kubernetes/KubernetesRuntime.java | 9 +-
.../kubernetes/KubernetesRuntimeFactory.java | 10 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 201 ++++++++++++++++++++-
4 files changed, 219 insertions(+), 4 deletions(-)
diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 41da287..0290acf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -842,6 +842,9 @@ public class FunctionsImpl extends ComponentResource implements Functions {
try {
File file = new File(destinationPath);
if (!file.exists()) {
+ if (file.getParentFile() != null && !file.getParentFile().exists()) {
+ file.getParentFile().mkdirs();
+ }
file.createNewFile();
}
FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index d3ae6ff..ed36a66 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -55,6 +55,7 @@ import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.Response;
import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.common.functions.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -70,6 +71,7 @@ import org.apache.pulsar.functions.utils.Actions;
import org.apache.pulsar.functions.utils.FunctionCommon;
import java.io.IOException;
+import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -153,6 +155,7 @@ public class KubernetesRuntime implements Runtime {
private String narExtractionDirectory;
private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
private String functionInstanceClassPath;
+ private String downloadDirectory;
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
@@ -188,7 +191,8 @@ public class KubernetesRuntime implements Runtime {
Integer metricsPort,
String narExtractionDirectory,
Optional<KubernetesManifestCustomizer> manifestCustomizer,
- String functinoInstanceClassPath) throws Exception {
+ String functinoInstanceClassPath,
+ String downloadDirectory) throws Exception {
this.appsClient = appsClient;
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
@@ -201,7 +205,8 @@ public class KubernetesRuntime implements Runtime {
this.pulsarRootDir = pulsarRootDir;
this.configAdminCLI = configAdminCLI;
this.userCodePkgUrl = userCodePkgUrl;
- this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
+ this.downloadDirectory = StringUtils.isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
+ this.originalCodeFileName = this.downloadDirectory + "/" + originalCodeFileName;
this.pulsarAdminUrl = pulsarAdminUrl;
this.secretsProviderConfigurator = secretsProviderConfigurator;
this.percentMemoryPadding = percentMemoryPadding;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index f001af4..ec34674 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -52,6 +52,7 @@ import java.util.Timer;
import java.util.TimerTask;
import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
/**
@@ -97,6 +98,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private Integer metricsPort;
private String narExtractionDirectory;
private String functionInstanceClassPath;
+ private String downloadDirectory;
@ToString.Exclude
@EqualsAndHashCode.Exclude
@@ -167,6 +169,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
} else {
this.configAdminCLI = "/bin/pulsar-admin";
}
+ this.downloadDirectory = isNotEmpty(workerConfig.getDownloadDirectory()) ?
+ workerConfig.getDownloadDirectory() : this.pulsarRootDir; // for backward comp
+ if (!Paths.get(this.downloadDirectory).isAbsolute()) {
+ this.downloadDirectory = this.pulsarRootDir + "/" + this.downloadDirectory;
+ }
this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies();
@@ -309,7 +316,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
metricsPort,
narExtractionDirectory,
manifestCustomizer,
- functionInstanceClassPath);
+ functionInstanceClassPath,
+ downloadDirectory);
}
@Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 0139d39..71e4fda 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.functions.runtime.kubernetes;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
import com.google.gson.JsonArray;
import com.google.gson.JsonObject;
import com.google.protobuf.util.JsonFormat;
@@ -59,6 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import static org.testng.Assert.assertThrows;
/**
@@ -179,7 +181,9 @@ public class KubernetesRuntimeTest {
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio,
- Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
+ Optional<RuntimeCustomizer> manifestCustomizer,
+ String downloadDirectory) throws Exception {
+
KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
WorkerConfig workerConfig = new WorkerConfig();
@@ -213,6 +217,7 @@ public class KubernetesRuntimeTest {
workerConfig.setFunctionInstanceMinResources(null);
workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
workerConfig.setAuthenticationEnabled(false);
+ workerConfig.setDownloadDirectory(downloadDirectory);
factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
doNothing().when(factory).setupClient();
@@ -221,6 +226,12 @@ public class KubernetesRuntimeTest {
}
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+ double cpuOverCommitRatio, double memoryOverCommitRatio,
+ Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
+ return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer, null);
+ }
+
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, Optional.empty());
}
@@ -352,6 +363,75 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), roundDecimal(resources.getCpu(), 3));
}
+ private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
+ KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+ List<String> args = container.getProcessArgs();
+
+ String classpath = javaInstanceJarFile;
+ String extraDepsEnv;
+ String jarLocation;
+ int portArg;
+ int metricsPortArg;
+ int totalArgs;
+ if (null != depsDir) {
+ extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
+ classpath = classpath + ":" + depsDir + "/*";
+ totalArgs = 37;
+ portArg = 26;
+ metricsPortArg = 28;
+ } else {
+ extraDepsEnv = "";
+ portArg = 25;
+ metricsPortArg = 27;
+ totalArgs = 36;
+ }
+ if (secretsAttached) {
+ totalArgs += 4;
+ }
+ if (StringUtils.isNotEmpty(downloadDirectory)){
+ jarLocation = downloadDirectory + "/" + userJarFile;
+ } else {
+ jarLocation = pulsarRootDir + "/" + userJarFile;
+ }
+
+ assertEquals(args.size(), totalArgs,
+ "Actual args : " + StringUtils.join(args, " "));
+
+ String expectedArgs = "exec java -cp " + classpath
+ + extraDepsEnv
+ + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+ + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+ + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+ + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+ + " -Xmx" + String.valueOf(RESOURCES.getRam())
+ + " org.apache.pulsar.functions.instance.JavaInstanceMain"
+ + " --jar " + jarLocation + " --instance_id "
+ + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+ + " --function_version " + config.getFunctionVersion()
+ + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+ + "' --pulsar_serviceurl " + pulsarServiceUrl
+ + " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --metrics_port " + args.get(metricsPortArg)
+ + " --state_storage_serviceurl " + stateStorageServiceUrl
+ + " --expected_healthcheck_interval -1";
+ if (secretsAttached) {
+ expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+ + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'";
+ }
+ expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
+
+ assertEquals(String.join(" ", args), expectedArgs);
+
+ // check padding and xmx
+ long heap = Long.parseLong(args.stream().filter(s -> s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
+ V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
+ assertEquals(heap, RESOURCES.getRam());
+ assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(), Math.round(heap + (heap * 0.1)));
+
+ // check cpu
+ assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+ assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+ }
+
private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached) throws Exception {
KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
List<String> args = container.getProcessArgs();
@@ -867,4 +947,123 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
}
+ KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+ double cpuOverCommitRatio, double memoryOverCommitRatio,
+ String manifestCustomizerClassName,
+ Map<String, Object> runtimeCustomizerConfig) throws Exception {
+ KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+ doNothing().when(factory).setupClient();
+
+ WorkerConfig workerConfig = new WorkerConfig();
+ KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+ kubernetesRuntimeFactoryConfig.setK8Uri(null);
+ kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setJobName(null);
+ kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+ kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
+ kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+ kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+ kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+ kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+ kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+ kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+ kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+ kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+ kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+ kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+ kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+ kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+ kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+ kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+ kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
+ kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+ kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
+ workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+ workerConfig.setFunctionRuntimeFactoryConfigs(
+ ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+ workerConfig.setFunctionInstanceMinResources(null);
+ workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+ workerConfig.setAuthenticationEnabled(false);
+ workerConfig.setRuntimeCustomizerConfig(runtimeCustomizerConfig);
+ workerConfig.setRuntimeCustomizerClassName(manifestCustomizerClassName);
+
+ Optional<RuntimeCustomizer> manifestCustomizer = Optional.empty();
+ if (!org.apache.commons.lang3.StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName())) {
+ manifestCustomizer = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
+ manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
+ }
+
+ factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(),
+ Optional.empty(), manifestCustomizer);
+ return factory;
+ }
+
+ public static JsonObject createRuntimeCustomizerConfig() {
+ JsonObject configObj = new JsonObject();
+ configObj.addProperty("jobNamespace", "custom-ns");
+ configObj.addProperty("jobName", "custom-name");
+
+ JsonObject extraAnn = new JsonObject();
+ extraAnn.addProperty("annotation", "test");
+ configObj.add("extraAnnotations", extraAnn);
+
+ JsonObject extraLabel = new JsonObject();
+ extraLabel.addProperty("label", "test");
+ configObj.add("extraLabels", extraLabel);
+
+ JsonObject nodeLabels = new JsonObject();
+ nodeLabels.addProperty("selector", "test");
+ configObj.add("nodeSelectorLabels", nodeLabels);
+
+ JsonArray tolerations = new JsonArray();
+ JsonObject toleration = new JsonObject();
+ toleration.addProperty("key", "test");
+ toleration.addProperty("value", "test");
+ toleration.addProperty("effect", "test");
+ tolerations.add(toleration);
+ configObj.add("tolerations", tolerations);
+
+ JsonObject resourceRequirements = new JsonObject();
+ JsonObject requests = new JsonObject();
+ JsonObject limits = new JsonObject();
+ requests.addProperty("cpu", "1");
+ requests.addProperty("memory", "4G");
+ limits.addProperty("cpu", "2");
+ limits.addProperty("memory", "8G");
+ resourceRequirements.add("requests", requests);
+ resourceRequirements.add("limits", limits);
+ configObj.add("resourceRequirements", resourceRequirements);
+ return configObj;
+ }
+
+ @Test
+ public void testJavaConstructorWithoutDownloadDirectoryDefined() throws Exception {
+ InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), null);
+
+ verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+ }
+
+ @Test
+ public void testJavaConstructorWithDownloadDirectoryDefined() throws Exception {
+ String downloadDirectory = "download/pulsar_functions";
+ InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);
+
+ verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+ }
+
+ @Test
+ public void testJavaConstructorWithAbsolutDownloadDirectoryDefined() throws Exception {
+ String downloadDirectory = "/functions/download/pulsar_functions";
+ InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+ factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);
+
+ verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+ }
+
}