You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/07 01:56:59 UTC
[pulsar] branch master updated: [Issue 8012][pulsar_functions] For
the Kubernetes function worker runtime,
allow customization of the pulsar function "jobName" (#8452)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new db6afd5 [Issue 8012][pulsar_functions] For the Kubernetes function worker runtime, allow customization of the pulsar function "jobName" (#8452)
db6afd5 is described below
commit db6afd52841108b0c35f0a0bdd0759f5243d93e0
Author: jdbeck <73...@users.noreply.github.com>
AuthorDate: Fri Nov 6 20:56:43 2020 -0500
[Issue 8012][pulsar_functions] For the Kubernetes function worker runtime, allow customization of the pulsar function "jobName" (#8452)
Fixes #8012
### Motivation
Currently (as of 2.6.x), the Pulsar KubernetesRuntime class hardcodes the jobName (the name assigned to the StatefulSet used to create the function pods) to the format "pf-[tenant]-[namespace]-[function][-optional 8 char hash]." While the intent of this name format was no doubt both to provide a human readable name for the k8s objects and ensure uniqueness within k8s, we've found it -- when combined with the 55 character size restriction imposed by KubernetesRuntime -- to be unnecessa [...]
---
.../BasicKubernetesManifestCustomizer.java | 13 +++-
.../kubernetes/KubernetesManifestCustomizer.java | 4 ++
.../runtime/kubernetes/KubernetesRuntime.java | 47 +++++++-----
.../kubernetes/KubernetesRuntimeFactory.java | 19 ++++-
.../kubernetes/KubernetesRuntimeFactoryConfig.java | 5 ++
.../kubernetes/KubernetesRuntimeFactoryTest.java | 4 +-
.../runtime/kubernetes/KubernetesRuntimeTest.java | 84 ++++++++++++++--------
.../runtime/process/ProcessRuntimeTest.java | 2 +-
.../KubernetesSecretsProviderConfigurator.java | 2 +-
.../SecretsProviderConfigurator.java | 2 +-
.../KubernetesSecretsProviderConfiguratorTest.java | 6 +-
.../worker/FunctionRuntimeManagerTest.java | 1 +
12 files changed, 130 insertions(+), 59 deletions(-)
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
index dc14b6a..a9a6929 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
@@ -50,6 +50,7 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
@NoArgsConstructor
static private class RuntimeOpts {
private String jobNamespace;
+ private String jobName;
private Map<String, String> extraLabels;
private Map<String, String> extraAnnotations;
private Map<String, String> nodeSelectorLabels;
@@ -70,7 +71,17 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
return currentNamespace;
}
}
-
+
+ @Override
+ public String customizeName(Function.FunctionDetails funcDetails, String currentName) {
+ RuntimeOpts opts = getOptsFromDetails(funcDetails);
+ if (!StringUtils.isEmpty(opts.getJobName())) {
+ return opts.getJobName();
+ } else {
+ return currentName;
+ }
+ }
+
@Override
public V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) {
RuntimeOpts opts = getOptsFromDetails(funcDetails);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
index fb69dfa..be273e7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
@@ -36,4 +36,8 @@ public interface KubernetesManifestCustomizer extends RuntimeCustomizer {
default String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) {
return currentNamespace;
}
+
+ default String customizeName(Function.FunctionDetails funcDetails, String currentName) {
+ return currentName;
+ }
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 6ae3d3b..e12cb6b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -130,6 +130,7 @@ public class KubernetesRuntime implements Runtime {
private InstanceControlGrpc.InstanceControlFutureStub[] stub;
private InstanceConfig instanceConfig;
private final String jobNamespace;
+ private final String jobName;
private final Map<String, String> customLabels;
private final Map<String, String> functionDockerImages;
private final String pulsarDockerImageName;
@@ -154,6 +155,7 @@ public class KubernetesRuntime implements Runtime {
KubernetesRuntime(AppsV1Api appsClient,
CoreV1Api coreClient,
String jobNamespace,
+ String jobName,
Map<String, String> customLabels,
Boolean installUserCodeDependencies,
String pythonDependencyRepository,
@@ -189,6 +191,7 @@ public class KubernetesRuntime implements Runtime {
this.coreClient = coreClient;
this.instanceConfig = instanceConfig;
this.jobNamespace = jobNamespace;
+ this.jobName = jobName;
this.customLabels = customLabels;
this.functionDockerImages = functionDockerImages;
this.pulsarDockerImageName = pulsarDockerImageName;
@@ -259,7 +262,7 @@ public class KubernetesRuntime implements Runtime {
narExtractionDirectory,
functinoInstanceClassPath));
- doChecks(instanceConfig.getFunctionDetails());
+ doChecks(instanceConfig.getFunctionDetails(), this.jobName);
}
/**
@@ -294,7 +297,7 @@ public class KubernetesRuntime implements Runtime {
channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
- String jobName = createJobName(instanceConfig.getFunctionDetails());
+ String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
String address = getServiceUrl(jobName, jobNamespace, i);
channel[i] = ManagedChannelBuilder.forAddress(address, grpcPort)
@@ -460,7 +463,7 @@ public class KubernetesRuntime implements Runtime {
@VisibleForTesting
V1Service createService() {
- final String jobName = createJobName(instanceConfig.getFunctionDetails());
+ final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1Service service = new V1Service();
@@ -545,7 +548,7 @@ public class KubernetesRuntime implements Runtime {
public void deleteStatefulSet() throws InterruptedException {
- String statefulSetName = createJobName(instanceConfig.getFunctionDetails());
+ String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1DeleteOptions options = new V1DeleteOptions();
options.setGracePeriodSeconds(5L);
options.setPropagationPolicy("Foreground");
@@ -700,7 +703,7 @@ public class KubernetesRuntime implements Runtime {
options.setGracePeriodSeconds(0L);
options.setPropagationPolicy("Foreground");
String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
- String serviceName = createJobName(instanceConfig.getFunctionDetails());
+ String serviceName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
Actions.Action deleteService = Actions.Action.builder()
.actionName(String.format("Deleting service for function %s", fqfn))
@@ -861,7 +864,7 @@ public class KubernetesRuntime implements Runtime {
@VisibleForTesting
V1StatefulSet createStatefulSet() {
- final String jobName = createJobName(instanceConfig.getFunctionDetails());
+ final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
final V1StatefulSet statefulSet = new V1StatefulSet();
@@ -1082,40 +1085,48 @@ public class KubernetesRuntime implements Runtime {
return ports;
}
- public static String createJobName(Function.FunctionDetails functionDetails) {
- return createJobName(functionDetails.getTenant(),
+ public static String createJobName(Function.FunctionDetails functionDetails, String jobName) {
+ return jobName == null ? createJobName(functionDetails.getTenant(),
functionDetails.getNamespace(),
- functionDetails.getName());
+ functionDetails.getName()) : createJobName(jobName);
}
private static String toValidPodName(String ori) {
return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
}
-
- private static String createJobName(String tenant, String namespace, String functionName) {
- final String jobNameContent = String.format("%s-%s-%s", tenant, namespace,functionName);
- final String jobName = "pf-" + jobNameContent;
- final String convertedJobName = toValidPodName(jobName);
+
+ private static String validateName(String jobName) {
+ final String convertedJobName = toValidPodName(jobName);
if (jobName.equals(convertedJobName)) {
return jobName;
}
// toValidPodName may cause naming collisions, add a short hash here to avoid it
- final String shortHash = DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
+ final String shortHash = DigestUtils.sha1Hex(jobName.replaceFirst("pf-", "")).toLowerCase().substring(0, 8);
return convertedJobName + "-" + shortHash;
}
+
+ private static String createJobName(String jobName) {
+ return validateName(jobName);
+ }
+
+ private static String createJobName(String tenant, String namespace, String functionName) {
+ final String jobName = "pf-" + String.format("%s-%s-%s", tenant, namespace, functionName);
+ return validateName(jobName);
+ }
private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, instanceId, jobName, jobNamespace);
}
- public static void doChecks(Function.FunctionDetails functionDetails) {
- final String jobName = createJobName(functionDetails);
+ public static void doChecks(Function.FunctionDetails functionDetails, String overridenJobName) {
+ final String jobName = createJobName(functionDetails, overridenJobName);
if (!jobName.equals(jobName.toLowerCase())) {
throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
}
final Matcher matcher = VALID_POD_NAME_REGEX.matcher(jobName);
if (!matcher.matches()) {
- throw new RuntimeException("Kubernetes only admits lower case and numbers.");
+ throw new RuntimeException("Kubernetes only admits lower case and numbers. " +
+ "(jobName=" + jobName + ")");
}
if (jobName.length() > maxJobNameSize) {
throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 31d8d39..98e5655 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -66,6 +66,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
private String k8Uri;
private String jobNamespace;
+ private String jobName;
private String pulsarDockerImageName;
private Map<String, String> functionDockerImages;
private String imagePullPolicy;
@@ -140,6 +141,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
} else {
this.jobNamespace = "default";
}
+ if (!isEmpty(factoryConfig.getJobName())) {
+ this.jobName = factoryConfig.getJobName();
+ } else {
+ this.jobName = null;
+ }
if (!isEmpty(factoryConfig.getPulsarDockerImageName())) {
this.pulsarDockerImageName = factoryConfig.getPulsarDockerImageName();
} else {
@@ -265,12 +271,14 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
String overriddenNamespace = manifestCustomizer.map((customizer) -> customizer.customizeNamespace(instanceConfig.getFunctionDetails(), jobNamespace)).orElse(jobNamespace);
+ String overriddenName = manifestCustomizer.map((customizer) -> customizer.customizeName(instanceConfig.getFunctionDetails(), jobName)).orElse(jobName);
return new KubernetesRuntime(
appsClient,
coreClient,
// get the namespace for this function
overriddenNamespace,
+ overriddenName,
customLabels,
installUserCodeDependencies,
pythonDependencyRepository,
@@ -310,9 +318,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
@Override
public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
- KubernetesRuntime.doChecks(functionDetails);
+ final String overriddenJobName = getOverriddenName(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, overriddenJobName);
validateMinResourcesRequired(functionDetails);
- secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, getOverriddenNamespace(functionDetails), functionDetails);
+ secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient,
+ getOverriddenNamespace(functionDetails), overriddenJobName, functionDetails);
}
@VisibleForTesting
@@ -420,4 +430,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
return manifestCustomizer.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace)).orElse(jobNamespace);
}
+
+ private String getOverriddenName(Function.FunctionDetails funcDetails) {
+ Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
+ return manifestCustomizer.map((customizer) -> customizer.customizeName(funcDetails, jobName)).orElse(jobName);
+ }
}
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index fafdaa3..95340a5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -39,6 +39,11 @@ public class KubernetesRuntimeFactoryConfig {
)
protected String jobNamespace;
@FieldContext(
+ doc = "The Kubernetes pod name to run the function instances. It is set to"
+ + "`pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty"
+ )
+ protected String jobName;
+ @FieldContext(
doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
)
protected String pulsarDockerImageName;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 04cb7e0..8e207db 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -105,7 +105,7 @@ public class KubernetesRuntimeFactoryTest {
}
@Override
- public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+ public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
}
}
@@ -152,6 +152,7 @@ public class KubernetesRuntimeFactoryTest {
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri(null);
kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setJobName(null);
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
@@ -375,6 +376,7 @@ public class KubernetesRuntimeFactoryTest {
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
+ kubernetesRuntimeFactoryConfig.setJobName("test_jobName");
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(imageNames);
kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index f2ae5d1..a06ba89 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -137,7 +137,7 @@ public class KubernetesRuntimeTest {
}
@Override
- public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+ public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
}
}
@@ -167,7 +167,7 @@ public class KubernetesRuntimeTest {
public void setup() {
System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
}
-
+
@AfterMethod
public void tearDown() {
if (null != this.factory) {
@@ -184,6 +184,7 @@ public class KubernetesRuntimeTest {
KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
kubernetesRuntimeFactoryConfig.setK8Uri(null);
kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+ kubernetesRuntimeFactoryConfig.setJobName(null);
kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
@@ -211,10 +212,10 @@ public class KubernetesRuntimeTest {
workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
workerConfig.setAuthenticationEnabled(false);
- factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+ factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
doNothing().when(factory).setupClient();
+
return factory;
-
}
KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
@@ -318,17 +319,17 @@ public class KubernetesRuntimeTest {
public void testResources() throws Exception {
// test overcommit
- testResouces(1, 1000, 1.0, 1.0);
- testResouces(1, 1000, 2.0, 1.0);
- testResouces(1, 1000, 1.0, 2.0);
- testResouces(1, 1000, 1.5, 1.5);
- testResouces(1, 1000, 1.3, 1.0);
+ testResources(1, 1000, 1.0, 1.0);
+ testResources(1, 1000, 2.0, 1.0);
+ testResources(1, 1000, 1.0, 2.0);
+ testResources(1, 1000, 1.5, 1.5);
+ testResources(1, 1000, 1.3, 1.0);
// test cpu rounding
- testResouces(1.0 / 1.5, 1000, 1.3, 1.0);
+ testResources(1.0 / 1.5, 1000, 1.3, 1.0);
}
- public void testResouces(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
+ public void testResources(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
Function.Resources resources = Function.Resources.newBuilder()
.setRam(userMemoryRequest).setCpu(userCpuRequest).setDisk(10000L).build();
@@ -492,9 +493,9 @@ public class KubernetesRuntimeTest {
assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
}
-
+
@Test
- public void testCreateJobName() throws Exception {
+ public void testCreateJobName() throws Exception {
verifyCreateJobNameWithBackwardCompatibility();
verifyCreateJobNameWithUpperCaseFunctionName();
verifyCreateJobNameWithDotFunctionName();
@@ -502,6 +503,8 @@ public class KubernetesRuntimeTest {
verifyCreateJobNameWithInvalidMarksFunctionName();
verifyCreateJobNameWithCollisionalFunctionName();
verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName();
+ verifyCreateJobNameWithOverriddenK8sPodName();
+ verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks();
}
FunctionDetails createFunctionDetails(final String functionName) {
@@ -536,57 +539,71 @@ public class KubernetesRuntimeTest {
private void verifyCreateJobNameWithBackwardCompatibility() throws Exception {
final FunctionDetails functionDetails = createFunctionDetails(TEST_NAME);
final String bcJobName = bcCreateJobName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
- final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
assertEquals(bcJobName, jobName);
- KubernetesRuntime.doChecks(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, null);
}
private void verifyCreateJobNameWithUpperCaseFunctionName() throws Exception {
FunctionDetails functionDetails = createFunctionDetails("UpperCaseFunction");
- final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
assertEquals(jobName, "pf-tenant-namespace-uppercasefunction-f0c5ca9a");
- KubernetesRuntime.doChecks(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, null);
}
private void verifyCreateJobNameWithDotFunctionName() throws Exception {
final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
- final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction");
- KubernetesRuntime.doChecks(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, null);
}
private void verifyCreateJobNameWithDotAndUpperCaseFunctionName() throws Exception {
final FunctionDetails functionDetails = createFunctionDetails("Clazz.TestFunction");
- final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction-92ec5bf6");
- KubernetesRuntime.doChecks(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, null);
}
private void verifyCreateJobNameWithInvalidMarksFunctionName() throws Exception {
final FunctionDetails functionDetails = createFunctionDetails("test_function*name");
- final String jobName = KubernetesRuntime.createJobName(functionDetails);
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
assertEquals(jobName, "pf-tenant-namespace-test-function-name-b5a215ad");
- KubernetesRuntime.doChecks(functionDetails);
+ KubernetesRuntime.doChecks(functionDetails, null);
+ }
+
+ private void verifyCreateJobNameWithOverriddenK8sPodName() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, "custom-k8s-pod-name");
+ assertEquals(jobName, "custom-k8s-pod-name");
+ KubernetesRuntime.doChecks(functionDetails, "custom-k8s-pod-name");
+ }
+
+ private void verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks() throws Exception {
+ final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+ final String jobName = KubernetesRuntime.createJobName(functionDetails, "invalid_pod*name");
+ assertEquals(jobName, "invalid-pod-name-04d0e74a");
+ KubernetesRuntime.doChecks(functionDetails, "invalid_pod*name");
}
private void verifyCreateJobNameWithCollisionalFunctionName() throws Exception {
final FunctionDetails functionDetail1 = createFunctionDetails("testfunction");
final FunctionDetails functionDetail2 = createFunctionDetails("testFunction");
- final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
- final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+ final String jobName1 = KubernetesRuntime.createJobName(functionDetail1, null);
+ final String jobName2 = KubernetesRuntime.createJobName(functionDetail2, null);
assertNotEquals(jobName1, jobName2);
- KubernetesRuntime.doChecks(functionDetail1);
- KubernetesRuntime.doChecks(functionDetail2);
+ KubernetesRuntime.doChecks(functionDetail1, null);
+ KubernetesRuntime.doChecks(functionDetail2, null);
}
private void verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName() throws Exception {
final FunctionDetails functionDetail1 = createFunctionDetails("test_function*name");
final FunctionDetails functionDetail2 = createFunctionDetails("test+function*name");
- final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
- final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+ final String jobName1 = KubernetesRuntime.createJobName(functionDetail1, null);
+ final String jobName2 = KubernetesRuntime.createJobName(functionDetail2, null);
assertNotEquals(jobName1, jobName2);
- KubernetesRuntime.doChecks(functionDetail1);
- KubernetesRuntime.doChecks(functionDetail2);
+ KubernetesRuntime.doChecks(functionDetail1, null);
+ KubernetesRuntime.doChecks(functionDetail2, null);
}
@Test
@@ -595,6 +612,7 @@ public class KubernetesRuntimeTest {
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
JsonObject configObj = new JsonObject();
configObj.addProperty("jobNamespace", "custom-ns");
+ configObj.addProperty("jobName", "custom-name");
return fb.setCustomRuntimeOptions(configObj.toString());
}));
@@ -606,6 +624,8 @@ public class KubernetesRuntimeTest {
V1Service serviceSpec = container.createService();
assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
+ assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT + "-" +
+ TEST_NAMESPACE + "-" + TEST_NAME);
}
@Test
@@ -614,6 +634,7 @@ public class KubernetesRuntimeTest {
config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
JsonObject configObj = new JsonObject();
configObj.addProperty("jobNamespace", "custom-ns");
+ configObj.addProperty("jobName", "custom-name");
JsonObject extraAnn = new JsonObject();
extraAnn.addProperty("annotation", "test");
@@ -666,6 +687,7 @@ public class KubernetesRuntimeTest {
V1Service serviceSpec = container.createService();
assertEquals(serviceSpec.getMetadata().getNamespace(), "custom-ns");
+ assertEquals(serviceSpec.getMetadata().getName(), "custom-name");
assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), "test");
assertEquals(serviceSpec.getMetadata().getLabels().get("label"), "test");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 638780c..8a71a7c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -97,7 +97,7 @@ public class ProcessRuntimeTest {
}
@Override
- public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+ public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
}
}
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 81e50fd..10988c7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -107,7 +107,7 @@ public class KubernetesSecretsProviderConfigurator implements SecretsProviderCon
// The secret object should be of type Map<String, String> and it should contain "id" and "key"
@Override
- public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {
+ public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, Function.FunctionDetails functionDetails) {
if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
Type type = new TypeToken<Map<String, Object>>() {
}.getType();
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
index e51242e..d65de8d 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
@@ -69,6 +69,6 @@ public interface SecretsProviderConfigurator {
/**
* Do config checks to see whether the secrets provided are conforming.
*/
- default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {}
+ default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, Function.FunctionDetails functionDetails) {}
}
\ No newline at end of file
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
index fc764cf..c2db339 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
@@ -38,7 +38,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
HashMap<String, Object> map = new HashMap<String, Object>();
map.put("secretname", "randomsecret");
Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
- provider.doAdmissionChecks(null, null, null, functionDetails);
+ provider.doAdmissionChecks(null, null, null, null, functionDetails);
Assert.fail("Non conforming secret object should not validate");
} catch (Exception e) {
}
@@ -48,7 +48,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
map1.put("secretname", "secretvalue");
map.put("secretname", map1);
Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
- provider.doAdmissionChecks(null, null, null, functionDetails);
+ provider.doAdmissionChecks(null, null, null, null, functionDetails);
Assert.fail("Non conforming secret object should not validate");
} catch (Exception e) {
}
@@ -59,7 +59,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
map1.put("key", "secretvalue");
map.put("secretname", map1);
Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
- provider.doAdmissionChecks(null, null, null, functionDetails);
+ provider.doAdmissionChecks(null, null, null, null, functionDetails);
} catch (Exception e) {
Assert.fail("Conforming secret object should validate");
}
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index be2cfa7..1d6dd21 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -871,6 +871,7 @@ public class FunctionRuntimeManagerTest {
= new WorkerConfig.KubernetesContainerFactory();
kubernetesContainerFactory.setK8Uri("k8Uri");
kubernetesContainerFactory.setJobNamespace("jobNamespace");
+ kubernetesContainerFactory.setJobName("jobName");
kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");