You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by si...@apache.org on 2019/08/29 21:43:55 UTC
[pulsar] branch master updated: [k8s] convert to valid pod name
part with k8s function runtime (#4996)
This is an automated email from the ASF dual-hosted git repository.
sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new e231d22 [k8s] convert to valid pod name part with k8s function runtime (#4996)
e231d22 is described below
commit e231d22e5b04266348b5146c869a0b019a71725a
Author: Rui Fu <fr...@users.noreply.github.com>
AuthorDate: Fri Aug 30 05:43:50 2019 +0800
[k8s] convert to valid pod name part with k8s function runtime (#4996)
### Motivation
k8s runtime use tenant, namespace, functionName to create k8s pod name, but functionName might not meet the k8s pod name rule. `"[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*"`, which will cause pod create failed.
### Modifications
This PR create a new function called `toValidPodName` which convert a string to valid pod name part, including convert to lower case and replace all non-char part to "-".
---
.../functions/runtime/KubernetesRuntime.java | 17 +++-
.../functions/runtime/KubernetesRuntimeTest.java | 97 ++++++++++++++++++++++
2 files changed, 112 insertions(+), 2 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
index 4f9b77a..a5c6d1a 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/KubernetesRuntime.java
@@ -52,6 +52,7 @@ import io.kubernetes.client.models.V1StatefulSetSpec;
import io.kubernetes.client.models.V1Toleration;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.digest.DigestUtils;
import org.apache.pulsar.functions.auth.KubernetesFunctionAuthProvider;
import org.apache.pulsar.functions.instance.AuthenticationConfig;
import org.apache.pulsar.functions.instance.InstanceConfig;
@@ -1015,14 +1016,26 @@ public class KubernetesRuntime implements Runtime {
return ports;
}
- private static String createJobName(Function.FunctionDetails functionDetails) {
+ public static String createJobName(Function.FunctionDetails functionDetails) {
return createJobName(functionDetails.getTenant(),
functionDetails.getNamespace(),
functionDetails.getName());
}
+ private static String toValidPodName(String ori) {
+ return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
+ }
+
private static String createJobName(String tenant, String namespace, String functionName) {
- return "pf-" + tenant + "-" + namespace + "-" + functionName;
+ final String jobNameContent = String.format("%s-%s-%s", tenant, namespace,functionName);
+ final String jobName = "pf-" + jobNameContent;
+ final String convertedJobName = toValidPodName(jobName);
+ if (jobName.equals(convertedJobName)) {
+ return jobName;
+ }
+ // toValidPodName may cause naming collisions, add a short hash here to avoid it
+ final String shortHash = DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
+ return convertedJobName + "-" + shortHash;
}
private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
index e0214b2..812c003 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/KubernetesRuntimeTest.java
@@ -49,6 +49,7 @@ import static org.apache.pulsar.functions.utils.FunctionCommon.roundDecimal;
import static org.powermock.api.mockito.PowerMockito.doNothing;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotEquals;
/**
* Unit test of {@link ThreadRuntime}.
@@ -448,4 +449,100 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
}
+ @Test
+ public void testCreateJobName() throws Exception {
+ verifyCreateJobNameWithBackwardCompatibility();
+ verifyCreateJobNameWithUpperCaseFunctionName();
+ verifyCreateJobNameWithDotFunctionName();
+ verifyCreateJobNameWithDotAndUpperCaseFunctionName();
+ verifyCreateJobNameWithInvalidMarksFunctionName();
+ verifyCreateJobNameWithCollisionalFunctionName();
+ verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName();
+ }
+
+ FunctionDetails createFunctionDetails(final String functionName) {
+ FunctionDetails.Builder functionDetailsBuilder = FunctionDetails.newBuilder();
+ functionDetailsBuilder.setRuntime(FunctionDetails.Runtime.JAVA);
+ functionDetailsBuilder.setTenant(TEST_TENANT);
+ functionDetailsBuilder.setNamespace(TEST_NAMESPACE);
+ functionDetailsBuilder.setName(functionName);
+ functionDetailsBuilder.setClassName("org.apache.pulsar.functions.utils.functioncache.AddFunction");
+ functionDetailsBuilder.setSink(Function.SinkSpec.newBuilder()
+ .setTopic(TEST_NAME + "-output")
+ .setSerDeClassName("org.apache.pulsar.functions.runtime.serde.Utf8Serializer")
+ .setClassName("org.pulsar.pulsar.TestSink")
+ .setTypeClassName(String.class.getName())
+ .build());
+ functionDetailsBuilder.setLogTopic(TEST_NAME + "-log");
+ functionDetailsBuilder.setSource(Function.SourceSpec.newBuilder()
+ .setSubscriptionType(Function.SubscriptionType.FAILOVER)
+ .putAllInputSpecs(topicsToSchema)
+ .setClassName("org.pulsar.pulsar.TestSource")
+ .setTypeClassName(String.class.getName()));
+ functionDetailsBuilder.setSecretsMap("SomeMap");
+ functionDetailsBuilder.setResources(RESOURCES);
+ return functionDetailsBuilder.build();
+ }
+
+ // used for backward compatibility test
+ private String bcCreateJobName(String tenant, String namespace, String functionName) {
+ return "pf-" + tenant + "-" + namespace + "-" + functionName;
+ }
+
+ private void verifyCreateJobNameWithBackwardCompatibility() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails(TEST_NAME);
+ final String bcJobName = bcCreateJobName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
+ final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(bcJobName, jobName);
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithUpperCaseFunctionName() throws Exception {
+ FunctionDetails functionDetails = createFunctionDetails("UpperCaseFunction");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName, "pf-tenant-namespace-uppercasefunction-f0c5ca9a");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithDotFunctionName() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithDotAndUpperCaseFunctionName() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails("Clazz.TestFunction");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction-92ec5bf6");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithInvalidMarksFunctionName() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails("test_function*name");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ assertEquals(jobName, "pf-tenant-namespace-test-function-name-b5a215ad");
+ KubernetesRuntime.doChecks(functionDetails);
+ }
+
+ private void verifyCreateJobNameWithCollisionalFunctionName() throws Exception {
+ final FunctionDetails functionDetail1 = createFunctionDetails("testfunction");
+ final FunctionDetails functionDetail2 = createFunctionDetails("testFunction");
+ final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
+ final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+ assertNotEquals(jobName1, jobName2);
+ KubernetesRuntime.doChecks(functionDetail1);
+ KubernetesRuntime.doChecks(functionDetail2);
+ }
+
+ private void verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName() throws Exception {
+ final FunctionDetails functionDetail1 = createFunctionDetails("test_function*name");
+ final FunctionDetails functionDetail2 = createFunctionDetails("test+function*name");
+ final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
+ final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+ assertNotEquals(jobName1, jobName2);
+ KubernetesRuntime.doChecks(functionDetail1);
+ KubernetesRuntime.doChecks(functionDetail2);
+ }
+
}