You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/29 21:43:55 UTC

[pulsar] branch master updated: [k8s] convert to valid pod name part with k8s function runtime (#4996)

This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new e231d22  [k8s] convert to valid pod name part with k8s function runtime (#4996)
e231d22 is described below

commit e231d22e5b04266348b5146c869a0b019a71725a
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Aug 30 05:43:50 2019 +0800

    [k8s] convert to valid pod name part with k8s function runtime (#4996)
    
    ### Motivation
    
    k8s runtime use tenant, namespace, functionName to create k8s pod name, but functionName might not meet the k8s pod name rule. `"[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*"`, which will cause pod create failed.
    
    ### Modifications
    
    This PR create a new function called `toValidPodName` which convert a string to valid pod name part, including convert to lower case and replace all non-char part to "-".
---
 .../functions/runtime/KubernetesRuntime.java       | 17 +++-
 .../functions/runtime/KubernetesRuntimeTest.java   | 97 ++++++++++++++++++++++
 2 files changed, 112 insertions(+), 2 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 4f9b77a..a5c6d1a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
 import io.kubernetes.client.models.V1Toleration;
 import lombok.Getter;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
 import org.apache.pulsar.functions.instance.AuthenticationConfig;
 import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -1015,14 +1016,26 @@ public class KubernetesRuntime implements Runtime {
         return ports;
     }
 
-    private static String createJobName(Function.FunctionDetails functionDetails) {
+    public static String createJobName(Function.FunctionDetails functionDetails) {
         return createJobName(functionDetails.getTenant(),
                 functionDetails.getNamespace(),
                 functionDetails.getName());
     }
 
+    private static String toValidPodName(String ori) {
+        return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
+    }
+
     private static String createJobName(String tenant, String namespace, String functionName) {
-        return "pf-" + tenant + "-" + namespace + "-" + functionName;
+        final String jobNameContent = String.format("%s-%s-%s", tenant, namespace,functionName);
+        final String jobName = "pf-" + jobNameContent;
+        final String convertedJobName = toValidPodName(jobName);
+        if (jobName.equals(convertedJobName)) {
+            return jobName;
+        }
+        // toValidPodName may cause naming collisions, add a short hash here to avoid it
+        final String shortHash = DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
+        return convertedJobName + "-" + shortHash;
     }
 
     private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index e0214b2..812c003 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -49,6 +49,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
 import static org.powermock.api.mockito.PowerMockito.doNothing;
 import static org.powermock.api.mockito.PowerMockito.spy;
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
 
 /**
  * Unit test of {@link ThreadRuntime}.
@@ -448,4 +449,100 @@ public class KubernetesRuntimeTest {
         assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
     }
 
+    @Test
+    public void testCreateJobName() throws Exception {
+        verifyCreateJobNameWithBackwardCompatibility();
+        verifyCreateJobNameWithUpperCaseFunctionName();
+        verifyCreateJobNameWithDotFunctionName();
+        verifyCreateJobNameWithDotAndUpperCaseFunctionName();
+        verifyCreateJobNameWithInvalidMarksFunctionName();
+        verifyCreateJobNameWithCollisionalFunctionName();
+        verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName();
+    }
+
+    FunctionDetails createFunctionDetails(final String functionName) {
+        FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+        functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+        functionDetailsBuilder.setTenant(TEST_TENANT);
+        functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
+        functionDetailsBuilder.setName(functionName);
+        functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
+        functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
+                .setTopic(TEST_NAME + "-output")
+                .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+                .setClassName("org.pulsar.pulsar.TestSink")
+                .setTypeClassName(String.class.getName())
+                .build());
+        functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+        functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
+                .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+                .putAllInputSpecs(topicsToSchema)
+                .setClassName("org.pulsar.pulsar.TestSource")
+                .setTypeClassName(String.class.getName()));
+        functionDetailsBuilder.setSecretsMap("SomeMap");
+        functionDetailsBuilder.setResources(RESOURCES);
+        return functionDetailsBuilder.build();
+    }
+
+    // used for backward compatibility test
+    private String bcCreateJobName(String tenant, String namespace, String functionName) {
+        return "pf-" + tenant + "-" + namespace + "-" + functionName;
+    }
+
+    private void verifyCreateJobNameWithBackwardCompatibility() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails(TEST_NAME);
+        final String bcJobName = bcCreateJobName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
+        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        assertEquals(bcJobName, jobName);
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
+    private void verifyCreateJobNameWithUpperCaseFunctionName() throws Exception {
+        FunctionDetails functionDetails = createFunctionDetails("UpperCaseFunction");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        assertEquals(jobName, "pf-tenant-namespace-uppercasefunction-f0c5ca9a");
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
+    private void verifyCreateJobNameWithDotFunctionName() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction");
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
+    private void verifyCreateJobNameWithDotAndUpperCaseFunctionName() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails("Clazz.TestFunction");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction-92ec5bf6");
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
+    private void verifyCreateJobNameWithInvalidMarksFunctionName() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails("test_function*name");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        assertEquals(jobName, "pf-tenant-namespace-test-function-name-b5a215ad");
+        KubernetesRuntime.doChecks(functionDetails);
+    }
+
+    private void verifyCreateJobNameWithCollisionalFunctionName() throws Exception {
+        final FunctionDetails functionDetail1 = createFunctionDetails("testfunction");
+        final FunctionDetails functionDetail2 = createFunctionDetails("testFunction");
+        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
+        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+        assertNotEquals(jobName1, jobName2);
+        KubernetesRuntime.doChecks(functionDetail1);
+        KubernetesRuntime.doChecks(functionDetail2);
+    }
+
+    private void verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName() throws Exception {
+        final FunctionDetails functionDetail1 = createFunctionDetails("test_function*name");
+        final FunctionDetails functionDetail2 = createFunctionDetails("test+function*name");
+        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
+        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+        assertNotEquals(jobName1, jobName2);
+        KubernetesRuntime.doChecks(functionDetail1);
+        KubernetesRuntime.doChecks(functionDetail2);
+    }
+
 }