You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2021/02/22 07:13:39 UTC

[pulsar] branch branch-2.7 updated: [branch-2.7] Add downloadDirectory support to function k8s runtime (#9619)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 43ab9e8  [branch-2.7] Add downloadDirectory support to function k8s runtime (#9619)
43ab9e8 is described below

commit 43ab9e8643d1a90fb3d651a5a919d9986af3c075
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Mon Feb 22 15:13:04 2021 +0800

    [branch-2.7] Add downloadDirectory support to function k8s runtime (#9619)
    
    Fixes #9315
    
    ### Motivation
    
    per request from @codelipenghui in #9377, this pr is cherry pick #9377 to branch 2.7 and resolved conflicts.
    
    ### Modifications
    
    - cherry pick pr
    - resolve conflicts in tests
    - remove `package` related code
---
 .../client/admin/internal/FunctionsImpl.java       |   3 +
 .../runtime/kubernetes/KubernetesRuntime.java      |   9 +-
 .../kubernetes/KubernetesRuntimeFactory.java       |  10 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 201 ++++++++++++++++++++-
 4 files changed, 219 insertions(+), 4 deletions(-)

diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
index 41da287..0290acf 100644
--- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
+++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/FunctionsImpl.java
@@ -842,6 +842,9 @@ public class FunctionsImpl extends ComponentResource implements Functions {
         try {
             File file = new File(destinationPath);
             if (!file.exists()) {
+                if (file.getParentFile() != null && !file.getParentFile().exists()) {
+                    file.getParentFile().mkdirs();
+                }
                 file.createNewFile();
             }
             FileChannel os = new FileOutputStream(new File(destinationPath)).getChannel();
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index d3ae6ff..ed36a66 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -55,6 +55,7 @@ import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
 import okhttp3.Response;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.common.functions.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -70,6 +71,7 @@ import org.apache.pulsar.functions.utils.Actions;
 import org.apache.pulsar.functions.utils.FunctionCommon;
 
 import java.io.IOException;
+import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -153,6 +155,7 @@ public class KubernetesRuntime implements Runtime {
     private String narExtractionDirectory;
     private final Optional<KubernetesManifestCustomizer> manifestCustomizer;
     private String functionInstanceClassPath;
+    private String downloadDirectory;
 
     KubernetesRuntime(AppsV1Api appsClient,
                       CoreV1Api coreClient,
@@ -188,7 +191,8 @@ public class KubernetesRuntime implements Runtime {
                       Integer metricsPort,
                       String narExtractionDirectory,
                       Optional<KubernetesManifestCustomizer> manifestCustomizer,
-                      String functinoInstanceClassPath) throws Exception {
+                      String functinoInstanceClassPath,
+                      String downloadDirectory) throws Exception {
         this.appsClient = appsClient;
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
@@ -201,7 +205,8 @@ public class KubernetesRuntime implements Runtime {
         this.pulsarRootDir = pulsarRootDir;
         this.configAdminCLI = configAdminCLI;
         this.userCodePkgUrl = userCodePkgUrl;
-        this.originalCodeFileName = pulsarRootDir + "/" + originalCodeFileName;
+        this.downloadDirectory = StringUtils.isNotEmpty(downloadDirectory) ? downloadDirectory : this.pulsarRootDir; // for backward comp
+        this.originalCodeFileName = this.downloadDirectory + "/" + originalCodeFileName;
         this.pulsarAdminUrl = pulsarAdminUrl;
         this.secretsProviderConfigurator = secretsProviderConfigurator;
         this.percentMemoryPadding = percentMemoryPadding;
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index f001af4..ec34674 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -52,6 +52,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 
 import static org.apache.commons.lang3.StringUtils.isEmpty;
+import static org.apache.commons.lang3.StringUtils.isNotEmpty;
 import static org.apache.pulsar.functions.auth.FunctionAuthUtils.getFunctionAuthData;
 
 /**
@@ -97,6 +98,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
     private Integer metricsPort;
     private String narExtractionDirectory;
     private String functionInstanceClassPath;
+    private String downloadDirectory;
 
     @ToString.Exclude
     @EqualsAndHashCode.Exclude
@@ -167,6 +169,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         } else {
             this.configAdminCLI = "/bin/pulsar-admin";
         }
+        this.downloadDirectory = isNotEmpty(workerConfig.getDownloadDirectory()) ?
+                workerConfig.getDownloadDirectory() : this.pulsarRootDir; // for backward comp
+        if (!Paths.get(this.downloadDirectory).isAbsolute()) {
+            this.downloadDirectory = this.pulsarRootDir + "/" + this.downloadDirectory;
+        }
 
         this.submittingInsidePod = factoryConfig.getSubmittingInsidePod();
         this.installUserCodeDependencies = factoryConfig.getInstallUserCodeDependencies();
@@ -309,7 +316,8 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
             metricsPort,
             narExtractionDirectory,
             manifestCustomizer,
-            functionInstanceClassPath);
+            functionInstanceClassPath,
+            downloadDirectory);
     }
 
     @Override
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 0139d39..71e4fda 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -20,6 +20,7 @@
 package org.apache.pulsar.functions.runtime.kubernetes;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.protobuf.util.JsonFormat;
@@ -59,6 +60,7 @@ import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import static org.testng.Assert.assertThrows;
 
 /**
@@ -179,7 +181,9 @@ public class KubernetesRuntimeTest {
 
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio,
-                                                            Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
+                                                            Optional<RuntimeCustomizer> manifestCustomizer,
+                                                            String downloadDirectory) throws Exception {
+
         KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
 
         WorkerConfig workerConfig = new WorkerConfig();
@@ -213,6 +217,7 @@ public class KubernetesRuntimeTest {
         workerConfig.setFunctionInstanceMinResources(null);
         workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
         workerConfig.setAuthenticationEnabled(false);
+        workerConfig.setDownloadDirectory(downloadDirectory);
 
         factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
         doNothing().when(factory).setupClient();
@@ -221,6 +226,12 @@ public class KubernetesRuntimeTest {
     }
 
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+                                                            double cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            Optional<RuntimeCustomizer> manifestCustomizer) throws Exception {
+        return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, manifestCustomizer, null);
+    }
+
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
                                                             double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
         return createKubernetesRuntimeFactory(extraDepsDir, percentMemoryPadding, cpuOverCommitRatio, memoryOverCommitRatio, Optional.empty());
     }
@@ -352,6 +363,75 @@ public class KubernetesRuntimeTest {
         assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), roundDecimal(resources.getCpu(), 3));
     }
 
+    private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached, String downloadDirectory) throws Exception {
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        List<String> args = container.getProcessArgs();
+
+        String classpath = javaInstanceJarFile;
+        String extraDepsEnv;
+        String jarLocation;
+        int portArg;
+        int metricsPortArg;
+        int totalArgs;
+        if (null != depsDir) {
+            extraDepsEnv = " -Dpulsar.functions.extra.dependencies.dir=" + depsDir;
+            classpath = classpath + ":" + depsDir + "/*";
+            totalArgs = 37;
+            portArg = 26;
+            metricsPortArg = 28;
+        } else {
+            extraDepsEnv = "";
+            portArg = 25;
+            metricsPortArg = 27;
+            totalArgs = 36;
+        }
+        if (secretsAttached) {
+            totalArgs += 4;
+        }
+        if (StringUtils.isNotEmpty(downloadDirectory)){
+            jarLocation = downloadDirectory + "/" + userJarFile;
+        } else {
+            jarLocation = pulsarRootDir + "/" + userJarFile;
+        }
+
+        assertEquals(args.size(), totalArgs,
+                "Actual args : " + StringUtils.join(args, " "));
+
+        String expectedArgs = "exec java -cp " + classpath
+                + extraDepsEnv
+                + " -Dpulsar.functions.instance.classpath=/pulsar/lib/*"
+                + " -Dlog4j.configurationFile=kubernetes_instance_log4j2.xml "
+                + "-Dpulsar.function.log.dir=" + logDirectory + "/" + FunctionCommon.getFullyQualifiedName(config.getFunctionDetails())
+                + " -Dpulsar.function.log.file=" + config.getFunctionDetails().getName() + "-$SHARD_ID"
+                + " -Xmx" + String.valueOf(RESOURCES.getRam())
+                + " org.apache.pulsar.functions.instance.JavaInstanceMain"
+                + " --jar " + jarLocation + " --instance_id "
+                + "$SHARD_ID" + " --function_id " + config.getFunctionId()
+                + " --function_version " + config.getFunctionVersion()
+                + " --function_details '" + JsonFormat.printer().omittingInsignificantWhitespace().print(config.getFunctionDetails())
+                + "' --pulsar_serviceurl " + pulsarServiceUrl
+                + " --max_buffered_tuples 1024 --port " + args.get(portArg) + " --metrics_port " + args.get(metricsPortArg)
+                + " --state_storage_serviceurl " + stateStorageServiceUrl
+                + " --expected_healthcheck_interval -1";
+        if (secretsAttached) {
+            expectedArgs += " --secrets_provider org.apache.pulsar.functions.secretsprovider.ClearTextSecretsProvider"
+                    + " --secrets_provider_config '{\"Somevalue\":\"myvalue\"}'";
+        }
+        expectedArgs += " --cluster_name standalone --nar_extraction_directory " + narExtractionDirectory;
+
+        assertEquals(String.join(" ", args), expectedArgs);
+
+        // check padding and xmx
+        long heap = Long.parseLong(args.stream().filter(s -> s.startsWith("-Xmx")).collect(Collectors.toList()).get(0).replace("-Xmx", ""));
+        V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
+        assertEquals(heap, RESOURCES.getRam());
+        assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(), Math.round(heap + (heap * 0.1)));
+
+        // check cpu
+        assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+        assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+    }
+
     private void verifyJavaInstance(InstanceConfig config, String depsDir, boolean secretsAttached) throws Exception {
         KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
         List<String> args = container.getProcessArgs();
@@ -867,4 +947,123 @@ public class KubernetesRuntimeTest {
         assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
     }
 
+    KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
+                                                            double cpuOverCommitRatio, double memoryOverCommitRatio,
+                                                            String manifestCustomizerClassName,
+                                                            Map<String, Object> runtimeCustomizerConfig) throws Exception {
+        KubernetesRuntimeFactory factory = spy(new KubernetesRuntimeFactory());
+        doNothing().when(factory).setupClient();
+
+        WorkerConfig workerConfig = new WorkerConfig();
+        KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
+        kubernetesRuntimeFactoryConfig.setK8Uri(null);
+        kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setJobName(null);
+        kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
+        kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
+        kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
+        kubernetesRuntimeFactoryConfig.setPulsarRootDir(pulsarRootDir);
+        kubernetesRuntimeFactoryConfig.setSubmittingInsidePod(false);
+        kubernetesRuntimeFactoryConfig.setInstallUserCodeDependencies(true);
+        kubernetesRuntimeFactoryConfig.setPythonDependencyRepository("myrepo");
+        kubernetesRuntimeFactoryConfig.setPythonExtraDependencyRepository("anotherrepo");
+        kubernetesRuntimeFactoryConfig.setExtraFunctionDependenciesDir(extraDepsDir);
+        kubernetesRuntimeFactoryConfig.setCustomLabels(null);
+        kubernetesRuntimeFactoryConfig.setPercentMemoryPadding(percentMemoryPadding);
+        kubernetesRuntimeFactoryConfig.setCpuOverCommitRatio(cpuOverCommitRatio);
+        kubernetesRuntimeFactoryConfig.setMemoryOverCommitRatio(memoryOverCommitRatio);
+        kubernetesRuntimeFactoryConfig.setPulsarServiceUrl(pulsarServiceUrl);
+        kubernetesRuntimeFactoryConfig.setPulsarAdminUrl(pulsarAdminUrl);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMapNamespace(null);
+        kubernetesRuntimeFactoryConfig.setChangeConfigMap(null);
+        kubernetesRuntimeFactoryConfig.setGrpcPort(4332);
+        kubernetesRuntimeFactoryConfig.setMetricsPort(4331);
+        kubernetesRuntimeFactoryConfig.setNarExtractionDirectory(narExtractionDirectory);
+        workerConfig.setFunctionRuntimeFactoryClassName(KubernetesRuntimeFactory.class.getName());
+        workerConfig.setFunctionRuntimeFactoryConfigs(
+                ObjectMapperFactory.getThreadLocal().convertValue(kubernetesRuntimeFactoryConfig, Map.class));
+        workerConfig.setFunctionInstanceMinResources(null);
+        workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
+        workerConfig.setAuthenticationEnabled(false);
+        workerConfig.setRuntimeCustomizerConfig(runtimeCustomizerConfig);
+        workerConfig.setRuntimeCustomizerClassName(manifestCustomizerClassName);
+
+        Optional<RuntimeCustomizer> manifestCustomizer = Optional.empty();
+        if (!org.apache.commons.lang3.StringUtils.isEmpty(workerConfig.getRuntimeCustomizerClassName())) {
+            manifestCustomizer = Optional.of(RuntimeCustomizer.getRuntimeCustomizer(workerConfig.getRuntimeCustomizerClassName()));
+            manifestCustomizer.get().initialize(Optional.ofNullable(workerConfig.getRuntimeCustomizerConfig()).orElse(Collections.emptyMap()));
+        }
+
+        factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(),
+                Optional.empty(), manifestCustomizer);
+        return factory;
+    }
+
+    public static JsonObject createRuntimeCustomizerConfig() {
+        JsonObject configObj = new JsonObject();
+        configObj.addProperty("jobNamespace", "custom-ns");
+        configObj.addProperty("jobName", "custom-name");
+
+        JsonObject extraAnn = new JsonObject();
+        extraAnn.addProperty("annotation", "test");
+        configObj.add("extraAnnotations", extraAnn);
+
+        JsonObject extraLabel = new JsonObject();
+        extraLabel.addProperty("label", "test");
+        configObj.add("extraLabels", extraLabel);
+
+        JsonObject nodeLabels = new JsonObject();
+        nodeLabels.addProperty("selector", "test");
+        configObj.add("nodeSelectorLabels", nodeLabels);
+
+        JsonArray tolerations = new JsonArray();
+        JsonObject toleration = new JsonObject();
+        toleration.addProperty("key", "test");
+        toleration.addProperty("value", "test");
+        toleration.addProperty("effect", "test");
+        tolerations.add(toleration);
+        configObj.add("tolerations", tolerations);
+
+        JsonObject resourceRequirements = new JsonObject();
+        JsonObject requests = new JsonObject();
+        JsonObject limits = new JsonObject();
+        requests.addProperty("cpu", "1");
+        requests.addProperty("memory", "4G");
+        limits.addProperty("cpu", "2");
+        limits.addProperty("memory", "8G");
+        resourceRequirements.add("requests", requests);
+        resourceRequirements.add("limits", limits);
+        configObj.add("resourceRequirements", resourceRequirements);
+        return configObj;
+    }
+
+    @Test
+    public void testJavaConstructorWithoutDownloadDirectoryDefined() throws Exception {
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), null);
+
+        verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+    }
+
+    @Test
+    public void testJavaConstructorWithDownloadDirectoryDefined() throws Exception {
+        String downloadDirectory = "download/pulsar_functions";
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);
+
+        verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+    }
+
+    @Test
+    public void testJavaConstructorWithAbsolutDownloadDirectoryDefined() throws Exception {
+        String downloadDirectory = "/functions/download/pulsar_functions";
+        InstanceConfig config = createJavaInstanceConfig(FunctionDetails.Runtime.JAVA, false);
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0, Optional.empty(), downloadDirectory);
+
+        verifyJavaInstance(config, pulsarRootDir + "/instances/deps", false, factory.getDownloadDirectory());
+    }
+
 }