You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/07 03:22:28 UTC

[pulsar] 02/05: [Issue #8268][Pulsar Function] k8s runtime with go functions support (#8352)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit f43c1ee0da64070c1a1769e78bcafded4d6d61f3
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Tue Nov 17 11:42:56 2020 +0800

    [Issue #8268][Pulsar Function] k8s runtime with go functions support (#8352)
    
    Fixes #8268
    
    ### Motivation
    
    currently, go function cannot work with k8s runtime other than java and python, this PR is intended to add go function support with k8s runtime.
    
    ### Modifications
    
    removed `UnsupportedOperationException` with GO function, fixed go function executable permissions, and fix arguments passed to go function with correct format.
    
    this PR is making k8s runtime with go function workable, but may not cover all k8s scenarios, so comments are welcome, also need some help with tests, havnt add any tests yet.
    
    (cherry picked from commit bd475c28d34ac9d28530c7b8a8efe66261b03ef6)
---
 .../pulsar/functions/runtime/RuntimeUtils.java     |  2 +-
 .../runtime/kubernetes/KubernetesRuntime.java      | 11 ++-
 .../kubernetes/KubernetesRuntimeFactory.java       |  2 +-
 .../pulsar/functions/runtime/RuntimeUtilsTest.java |  2 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 83 ++++++++++++++++++++++
 .../KubernetesSecretsProviderConfigurator.java     |  3 +-
 6 files changed, 98 insertions(+), 5 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
index 4108f78..89827b9 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/RuntimeUtils.java
@@ -228,7 +228,7 @@ public class RuntimeUtils {
         // pulsar-client-go uses cgo, so the currently uploaded executable doesn't support cross-compilation.
         args.add(originalCodeFileName);
         args.add("-instance-conf");
-        args.add(configContent);
+        args.add("'" + configContent + "'");
         return args;
     }
 
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index c6dcee4..c8807a2 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -216,7 +216,7 @@ public class KubernetesRuntime implements Runtime {
                 logConfigFile = pulsarRootDir + "/conf/functions-logging/console_logging_config.ini";
                 break;
             case GO:
-                throw new UnsupportedOperationException();
+                break;
         }
 
         this.authConfig = authConfig;
@@ -229,6 +229,15 @@ public class KubernetesRuntime implements Runtime {
 
         this.processArgs = new LinkedList<>();
         this.processArgs.addAll(RuntimeUtils.getArgsBeforeCmd(instanceConfig, extraDependenciesDir));
+
+        if (instanceConfig.getFunctionDetails().getRuntime() == Function.FunctionDetails.Runtime.GO) {
+            // before we run the command, make sure the go executable with correct permissions
+            this.processArgs.add("chmod");
+            this.processArgs.add("777");
+            this.processArgs.add(this.originalCodeFileName);
+            this.processArgs.add("&&");
+        }
+
         // use exec to to launch function so that it gets launched in the foreground with the same PID as shell
         // so that when we kill the pod, the signal will get propagated to the function code
         this.processArgs.add("exec");
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index f5113e4..ecb59a6 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -248,7 +248,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
                 instanceFile = pythonInstanceFile;
                 break;
             case GO:
-                throw new UnsupportedOperationException();
+                break;
             default:
                 throw new RuntimeException("Unsupported Runtime " + instanceConfig.getFunctionDetails().getRuntime());
         }
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
index 479ddbe..6de84a0 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/RuntimeUtilsTest.java
@@ -106,7 +106,7 @@ public class RuntimeUtilsTest {
 
         List<String> commands = RuntimeUtils.getGoInstanceCmd(instanceConfig, "config", "pulsar://localhost:6650");
 
-        HashMap goInstanceConfig = new ObjectMapper().readValue(commands.get(2), HashMap.class);
+        HashMap goInstanceConfig = new ObjectMapper().readValue(commands.get(2).replaceAll("^\'|\'$", ""), HashMap.class);
 
         Assert.assertEquals(commands.toArray().length, 3);
         Assert.assertEquals(commands.get(0), "config");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index 9a725a9..d9d1a97 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -19,6 +19,7 @@
 
 package org.apache.pulsar.functions.runtime.kubernetes;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.gson.JsonArray;
 import com.google.gson.JsonObject;
 import com.google.protobuf.util.JsonFormat;
@@ -695,4 +696,86 @@ public class KubernetesRuntimeTest {
         V1StatefulSet spec = container.createStatefulSet();
         assertEquals(spec.getSpec().getTemplate().getSpec().getServiceAccountName(), "my-service-account");
     }
+
+    InstanceConfig createGolangInstanceConfig() {
+        InstanceConfig config = new InstanceConfig();
+
+        config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.GO, false));
+        config.setFunctionId(java.util.UUID.randomUUID().toString());
+        config.setFunctionVersion("1.0");
+        config.setInstanceId(0);
+        config.setMaxBufferedTuples(1024);
+        config.setClusterName("standalone");
+
+        return config;
+    }
+
+    @Test
+    public void testGolangConstructor() throws Exception {
+        InstanceConfig config = createGolangInstanceConfig();
+
+        factory = createKubernetesRuntimeFactory(null, 10, 1.0, 1.0);
+
+        verifyGolangInstance(config);
+    }
+
+    private void verifyGolangInstance(InstanceConfig config) throws Exception {
+        KubernetesRuntime container = factory.createContainer(config, userJarFile, userJarFile, 30l);
+        List<String> args = container.getProcessArgs();
+
+        int totalArgs = 8;
+
+        assertEquals(args.size(), totalArgs,
+                "Actual args : " + StringUtils.join(args, " "));
+
+        HashMap goInstanceConfig = new ObjectMapper().readValue(args.get(7).replaceAll("^\'|\'$", ""), HashMap.class);
+
+        assertEquals(args.get(0), "chmod");
+        assertEquals(args.get(1), "777");
+        assertEquals(args.get(2), pulsarRootDir + "/" + userJarFile);
+        assertEquals(args.get(3), "&&");
+        assertEquals(args.get(4), "exec");
+        assertEquals(args.get(5), pulsarRootDir + "/" + userJarFile);
+        assertEquals(args.get(6), "-instance-conf");
+        assertEquals(goInstanceConfig.get("maxBufTuples"), 1024);
+        assertEquals(goInstanceConfig.get("maxMessageRetries"), 0);
+        assertEquals(goInstanceConfig.get("killAfterIdleMs"), 0);
+        assertEquals(goInstanceConfig.get("parallelism"), 0);
+        assertEquals(goInstanceConfig.get("className"), "");
+        assertEquals(goInstanceConfig.get("sourceSpecsTopic"), "persistent://sample/standalone/ns1/test_src");
+        assertEquals(goInstanceConfig.get("sourceSchemaType"), "");
+        assertEquals(goInstanceConfig.get("sinkSpecsTopic"), TEST_NAME + "-output");
+        assertEquals(goInstanceConfig.get("clusterName"), "standalone");
+        assertEquals(goInstanceConfig.get("nameSpace"), TEST_NAMESPACE);
+        assertEquals(goInstanceConfig.get("receiverQueueSize"), 0);
+        assertEquals(goInstanceConfig.get("tenant"), TEST_TENANT);
+        assertEquals(goInstanceConfig.get("logTopic"), TEST_NAME + "-log");
+        assertEquals(goInstanceConfig.get("processingGuarantees"), 0);
+        assertEquals(goInstanceConfig.get("autoAck"), false);
+        assertEquals(goInstanceConfig.get("regexPatternSubscription"), false);
+        assertEquals(goInstanceConfig.get("pulsarServiceURL"), pulsarServiceUrl);
+        assertEquals(goInstanceConfig.get("runtime"), 0);
+        assertEquals(goInstanceConfig.get("cpu"), 1.0);
+        assertEquals(goInstanceConfig.get("funcVersion"), "1.0");
+        assertEquals(goInstanceConfig.get("disk"), 10000);
+        assertEquals(goInstanceConfig.get("instanceID"), 0);
+        assertEquals(goInstanceConfig.get("cleanupSubscription"), false);
+        assertEquals(goInstanceConfig.get("port"), 0);
+        assertEquals(goInstanceConfig.get("subscriptionType"), 0);
+        assertEquals(goInstanceConfig.get("timeoutMs"), 0);
+        assertEquals(goInstanceConfig.get("subscriptionName"), "");
+        assertEquals(goInstanceConfig.get("name"), TEST_NAME);
+        assertEquals(goInstanceConfig.get("expectedHealthCheckInterval"), 0);
+        assertEquals(goInstanceConfig.get("deadLetterTopic"), "");
+
+        // check padding and xmx
+        V1Container containerSpec = container.getFunctionContainer(Collections.emptyList(), RESOURCES);
+        assertEquals(containerSpec.getResources().getLimits().get("memory").getNumber().longValue(),
+                Math.round(RESOURCES.getRam() + (RESOURCES.getRam() * 0.1)));
+
+        // check cpu
+        assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+        assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
+    }
+
 }
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 81e50fd..593d76d 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -52,7 +52,8 @@ public class KubernetesSecretsProviderConfigurator implements SecretsProviderCon
             case PYTHON:
                 return "secretsprovider.EnvironmentBasedSecretsProvider";
             case GO:
-                throw new UnsupportedOperationException();
+                // [TODO] See GH issue #8425, we should finish this part once the issue is resolved.
+                return "";
             default:
                 throw new RuntimeException("Unknown function runtime " + functionDetails.getRuntime());
         }