You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/11/07 01:56:59 UTC

[pulsar] branch master updated: [Issue 8012][pulsar_functions] For the Kubernetes function worker runtime, allow customization of the pulsar function "jobName" (#8452)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new db6afd5  [Issue 8012][pulsar_functions] For the Kubernetes function worker runtime, allow customization of the pulsar function "jobName" (#8452)
db6afd5 is described below

commit db6afd52841108b0c35f0a0bdd0759f5243d93e0
Author: jdbeck <73...@users.noreply.github.com>
AuthorDate: Fri Nov 6 20:56:43 2020 -0500

    [Issue 8012][pulsar_functions] For the Kubernetes function worker runtime, allow customization of the pulsar function "jobName" (#8452)
    
    Fixes #8012
    
    ### Motivation
    
    Currently (as of 2.6.x), the Pulsar KubernetesRuntime class hardcodes the jobName (the name assigned to the StatefulSet used to create the function pods) to the format "pf-[tenant]-[namespace]-[function][-optional 8 char hash]." While the intent of this name format was no doubt both to provide a human readable name for the k8s objects and ensure uniqueness within k8s, we've found it -- when combined with the 55 character size restriction imposed by KubernetesRuntime -- to be unnecessa [...]
---
 .../BasicKubernetesManifestCustomizer.java         | 13 +++-
 .../kubernetes/KubernetesManifestCustomizer.java   |  4 ++
 .../runtime/kubernetes/KubernetesRuntime.java      | 47 +++++++-----
 .../kubernetes/KubernetesRuntimeFactory.java       | 19 ++++-
 .../kubernetes/KubernetesRuntimeFactoryConfig.java |  5 ++
 .../kubernetes/KubernetesRuntimeFactoryTest.java   |  4 +-
 .../runtime/kubernetes/KubernetesRuntimeTest.java  | 84 ++++++++++++++--------
 .../runtime/process/ProcessRuntimeTest.java        |  2 +-
 .../KubernetesSecretsProviderConfigurator.java     |  2 +-
 .../SecretsProviderConfigurator.java               |  2 +-
 .../KubernetesSecretsProviderConfiguratorTest.java |  6 +-
 .../worker/FunctionRuntimeManagerTest.java         |  1 +
 12 files changed, 130 insertions(+), 59 deletions(-)

diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
index dc14b6a..a9a6929 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/BasicKubernetesManifestCustomizer.java
@@ -50,6 +50,7 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
     @NoArgsConstructor
     static private class RuntimeOpts {
         private String jobNamespace;
+        private String jobName;
         private Map<String, String> extraLabels;
         private Map<String, String> extraAnnotations;
         private Map<String, String> nodeSelectorLabels;
@@ -70,7 +71,17 @@ public class BasicKubernetesManifestCustomizer implements KubernetesManifestCust
             return currentNamespace;
         }
     }
-
+    
+    @Override
+    public String customizeName(Function.FunctionDetails funcDetails, String currentName) {
+        RuntimeOpts opts = getOptsFromDetails(funcDetails);
+        if (!StringUtils.isEmpty(opts.getJobName())) {
+            return opts.getJobName();
+        } else {
+            return currentName;
+        }
+    }
+    
     @Override
     public V1Service customizeService(Function.FunctionDetails funcDetails, V1Service service) {
         RuntimeOpts opts = getOptsFromDetails(funcDetails);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
index fb69dfa..be273e7 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesManifestCustomizer.java
@@ -36,4 +36,8 @@ public interface KubernetesManifestCustomizer extends RuntimeCustomizer {
     default String customizeNamespace(Function.FunctionDetails funcDetails, String currentNamespace) {
         return currentNamespace;
     }
+    
+    default String customizeName(Function.FunctionDetails funcDetails, String currentName) {
+        return currentName;
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
index 6ae3d3b..e12cb6b 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntime.java
@@ -130,6 +130,7 @@ public class KubernetesRuntime implements Runtime {
     private InstanceControlGrpc.InstanceControlFutureStub[] stub;
     private InstanceConfig instanceConfig;
     private final String jobNamespace;
+    private final String jobName;
     private final Map<String, String> customLabels;
     private final Map<String, String> functionDockerImages;
     private final String pulsarDockerImageName;
@@ -154,6 +155,7 @@ public class KubernetesRuntime implements Runtime {
     KubernetesRuntime(AppsV1Api appsClient,
                       CoreV1Api coreClient,
                       String jobNamespace,
+                      String jobName,
                       Map<String, String> customLabels,
                       Boolean installUserCodeDependencies,
                       String pythonDependencyRepository,
@@ -189,6 +191,7 @@ public class KubernetesRuntime implements Runtime {
         this.coreClient = coreClient;
         this.instanceConfig = instanceConfig;
         this.jobNamespace = jobNamespace;
+        this.jobName = jobName;
         this.customLabels = customLabels;
         this.functionDockerImages = functionDockerImages;
         this.pulsarDockerImageName = pulsarDockerImageName;
@@ -259,7 +262,7 @@ public class KubernetesRuntime implements Runtime {
                         narExtractionDirectory,
                         functinoInstanceClassPath));
 
-        doChecks(instanceConfig.getFunctionDetails());
+        doChecks(instanceConfig.getFunctionDetails(), this.jobName);
     }
 
     /**
@@ -294,7 +297,7 @@ public class KubernetesRuntime implements Runtime {
             channel = new ManagedChannel[instanceConfig.getFunctionDetails().getParallelism()];
             stub = new InstanceControlGrpc.InstanceControlFutureStub[instanceConfig.getFunctionDetails().getParallelism()];
 
-            String jobName = createJobName(instanceConfig.getFunctionDetails());
+            String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
             for (int i = 0; i < instanceConfig.getFunctionDetails().getParallelism(); ++i) {
                 String address = getServiceUrl(jobName, jobNamespace, i);
                 channel[i] = ManagedChannelBuilder.forAddress(address, grpcPort)
@@ -460,7 +463,7 @@ public class KubernetesRuntime implements Runtime {
 
     @VisibleForTesting
     V1Service createService() {
-        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+        final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
 
         final V1Service service = new V1Service();
 
@@ -545,7 +548,7 @@ public class KubernetesRuntime implements Runtime {
 
 
     public void deleteStatefulSet() throws InterruptedException {
-        String statefulSetName = createJobName(instanceConfig.getFunctionDetails());
+        String statefulSetName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
         final V1DeleteOptions options = new V1DeleteOptions();
         options.setGracePeriodSeconds(5L);
         options.setPropagationPolicy("Foreground");
@@ -700,7 +703,7 @@ public class KubernetesRuntime implements Runtime {
         options.setGracePeriodSeconds(0L);
         options.setPropagationPolicy("Foreground");
         String fqfn = FunctionCommon.getFullyQualifiedName(instanceConfig.getFunctionDetails());
-        String serviceName = createJobName(instanceConfig.getFunctionDetails());
+        String serviceName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
 
         Actions.Action deleteService = Actions.Action.builder()
                 .actionName(String.format("Deleting service for function %s", fqfn))
@@ -861,7 +864,7 @@ public class KubernetesRuntime implements Runtime {
 
     @VisibleForTesting
     V1StatefulSet createStatefulSet() {
-        final String jobName = createJobName(instanceConfig.getFunctionDetails());
+        final String jobName = createJobName(instanceConfig.getFunctionDetails(), this.jobName);
 
         final V1StatefulSet statefulSet = new V1StatefulSet();
 
@@ -1082,40 +1085,48 @@ public class KubernetesRuntime implements Runtime {
         return ports;
     }
 
-    public static String createJobName(Function.FunctionDetails functionDetails) {
-        return createJobName(functionDetails.getTenant(),
+    public static String createJobName(Function.FunctionDetails functionDetails, String jobName) {
+        return jobName == null ? createJobName(functionDetails.getTenant(),
                 functionDetails.getNamespace(),
-                functionDetails.getName());
+                functionDetails.getName()) : createJobName(jobName);
     }
 
     private static String toValidPodName(String ori) {
         return ori.toLowerCase().replaceAll("[^a-z0-9-\\.]", "-");
     }
-
-    private static String createJobName(String tenant, String namespace, String functionName) {
-        final String jobNameContent = String.format("%s-%s-%s", tenant, namespace,functionName);
-        final String jobName = "pf-" + jobNameContent;
-        final String convertedJobName = toValidPodName(jobName);
+    
+    private static String validateName(String jobName) {
+    	final String convertedJobName = toValidPodName(jobName);
         if (jobName.equals(convertedJobName)) {
             return jobName;
         }
         // toValidPodName may cause naming collisions, add a short hash here to avoid it
-        final String shortHash = DigestUtils.sha1Hex(jobNameContent).toLowerCase().substring(0, 8);
+        final String shortHash = DigestUtils.sha1Hex(jobName.replaceFirst("pf-", "")).toLowerCase().substring(0, 8);
         return convertedJobName + "-" + shortHash;
     }
+    
+    private static String createJobName(String jobName) {
+    	return validateName(jobName);
+    }
+    
+    private static String createJobName(String tenant, String namespace, String functionName) {
+        final String jobName = "pf-" + String.format("%s-%s-%s", tenant, namespace, functionName);
+        return validateName(jobName);
+    }
 
     private static String getServiceUrl(String jobName, String jobNamespace, int instanceId) {
         return String.format("%s-%d.%s.%s.svc.cluster.local", jobName, instanceId, jobName, jobNamespace);
     }
 
-    public static void doChecks(Function.FunctionDetails functionDetails) {
-        final String jobName = createJobName(functionDetails);
+    public static void doChecks(Function.FunctionDetails functionDetails, String overridenJobName) {
+        final String jobName = createJobName(functionDetails, overridenJobName);
         if (!jobName.equals(jobName.toLowerCase())) {
             throw new RuntimeException("Kubernetes does not allow upper case jobNames.");
         }
         final Matcher matcher = VALID_POD_NAME_REGEX.matcher(jobName);
         if (!matcher.matches()) {
-            throw new RuntimeException("Kubernetes only admits lower case and numbers.");
+            throw new RuntimeException("Kubernetes only admits lower case and numbers. " +
+            		"(jobName=" + jobName + ")");
         }
         if (jobName.length() > maxJobNameSize) {
             throw new RuntimeException("Kubernetes job name size should be less than " + maxJobNameSize);
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
index 31d8d39..98e5655 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactory.java
@@ -66,6 +66,7 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
 
     private String k8Uri;
     private String jobNamespace;
+    private String jobName;
     private String pulsarDockerImageName;
     private Map<String, String> functionDockerImages;
     private String imagePullPolicy;
@@ -140,6 +141,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         } else {
             this.jobNamespace = "default";
         }
+        if (!isEmpty(factoryConfig.getJobName())) {
+            this.jobName = factoryConfig.getJobName();
+        } else {
+            this.jobName = null;
+        }
         if (!isEmpty(factoryConfig.getPulsarDockerImageName())) {
             this.pulsarDockerImageName = factoryConfig.getPulsarDockerImageName();
         } else {
@@ -265,12 +271,14 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
 
         Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
         String overriddenNamespace = manifestCustomizer.map((customizer) -> customizer.customizeNamespace(instanceConfig.getFunctionDetails(), jobNamespace)).orElse(jobNamespace);
+        String overriddenName = manifestCustomizer.map((customizer) -> customizer.customizeName(instanceConfig.getFunctionDetails(), jobName)).orElse(jobName);
 
         return new KubernetesRuntime(
             appsClient,
             coreClient,
             // get the namespace for this function
             overriddenNamespace,
+            overriddenName,
             customLabels,
             installUserCodeDependencies,
             pythonDependencyRepository,
@@ -310,9 +318,11 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
 
     @Override
     public void doAdmissionChecks(Function.FunctionDetails functionDetails) {
-        KubernetesRuntime.doChecks(functionDetails);
+    	final String overriddenJobName = getOverriddenName(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, overriddenJobName);
         validateMinResourcesRequired(functionDetails);
-        secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, getOverriddenNamespace(functionDetails), functionDetails);
+        secretsProviderConfigurator.doAdmissionChecks(appsClient, coreClient, 
+        		getOverriddenNamespace(functionDetails), overriddenJobName, functionDetails);
     }
 
     @VisibleForTesting
@@ -420,4 +430,9 @@ public class KubernetesRuntimeFactory implements RuntimeFactory {
         Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
         return manifestCustomizer.map((customizer) -> customizer.customizeNamespace(funcDetails, jobNamespace)).orElse(jobNamespace);
     }
+    
+    private String getOverriddenName(Function.FunctionDetails funcDetails) {
+        Optional<KubernetesManifestCustomizer> manifestCustomizer = getRuntimeCustomizer();
+        return manifestCustomizer.map((customizer) -> customizer.customizeName(funcDetails, jobName)).orElse(jobName);
+    }
 }
diff --git a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
index fafdaa3..95340a5 100644
--- a/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
+++ b/pulsar-functions/runtime/src/main/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryConfig.java
@@ -39,6 +39,11 @@ public class KubernetesRuntimeFactoryConfig {
     )
     protected String jobNamespace;
     @FieldContext(
+            doc = "The Kubernetes pod name to run the function instances. It is set to"
+                + "`pf-<tenant>-<namespace>-<function_name>-<random_uuid(8)>` if this setting is left to be empty"
+        )
+    protected String jobName;
+    @FieldContext(
         doc = "The docker image used to run function instance. By default it is `apachepulsar/pulsar`"
     )
     protected String pulsarDockerImageName;
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
index 04cb7e0..8e207db 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeFactoryTest.java
@@ -105,7 +105,7 @@ public class KubernetesRuntimeFactoryTest {
         }
 
         @Override
-        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
 
         }
     }
@@ -152,6 +152,7 @@ public class KubernetesRuntimeFactoryTest {
         KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
         kubernetesRuntimeFactoryConfig.setK8Uri(null);
         kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setJobName(null);
         kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
         kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
         kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
@@ -375,6 +376,7 @@ public class KubernetesRuntimeFactoryTest {
         KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
         kubernetesRuntimeFactoryConfig.setK8Uri("test_k8uri");
         kubernetesRuntimeFactoryConfig.setJobNamespace("test_jobNamespace");
+        kubernetesRuntimeFactoryConfig.setJobName("test_jobName");
         kubernetesRuntimeFactoryConfig.setPulsarDockerImageName("test_dockerImage");
         kubernetesRuntimeFactoryConfig.setFunctionDockerImages(imageNames);
         kubernetesRuntimeFactoryConfig.setImagePullPolicy("test_imagePullPolicy");
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
index f2ae5d1..a06ba89 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/kubernetes/KubernetesRuntimeTest.java
@@ -137,7 +137,7 @@ public class KubernetesRuntimeTest {
         }
 
         @Override
-        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
 
         }
     }
@@ -167,7 +167,7 @@ public class KubernetesRuntimeTest {
     public void setup() {
         System.setProperty(FUNCTIONS_INSTANCE_CLASSPATH, "/pulsar/lib/*");
     }
-
+    
     @AfterMethod
     public void tearDown() {
         if (null != this.factory) {
@@ -184,6 +184,7 @@ public class KubernetesRuntimeTest {
         KubernetesRuntimeFactoryConfig kubernetesRuntimeFactoryConfig = new KubernetesRuntimeFactoryConfig();
         kubernetesRuntimeFactoryConfig.setK8Uri(null);
         kubernetesRuntimeFactoryConfig.setJobNamespace(null);
+        kubernetesRuntimeFactoryConfig.setJobName(null);
         kubernetesRuntimeFactoryConfig.setPulsarDockerImageName(null);
         kubernetesRuntimeFactoryConfig.setFunctionDockerImages(null);
         kubernetesRuntimeFactoryConfig.setImagePullPolicy(null);
@@ -211,10 +212,10 @@ public class KubernetesRuntimeTest {
         workerConfig.setStateStorageServiceUrl(stateStorageServiceUrl);
         workerConfig.setAuthenticationEnabled(false);
 
-        factory.initialize(workerConfig,null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
+        factory.initialize(workerConfig, null, new TestSecretProviderConfigurator(), Optional.empty(), manifestCustomizer);
         doNothing().when(factory).setupClient();
+        
         return factory;
-
     }
 
     KubernetesRuntimeFactory createKubernetesRuntimeFactory(String extraDepsDir, int percentMemoryPadding,
@@ -318,17 +319,17 @@ public class KubernetesRuntimeTest {
     public void testResources() throws Exception {
 
         // test overcommit
-        testResouces(1, 1000, 1.0, 1.0);
-        testResouces(1, 1000, 2.0, 1.0);
-        testResouces(1, 1000, 1.0, 2.0);
-        testResouces(1, 1000, 1.5, 1.5);
-        testResouces(1, 1000, 1.3, 1.0);
+    	testResources(1, 1000, 1.0, 1.0);
+    	testResources(1, 1000, 2.0, 1.0);
+    	testResources(1, 1000, 1.0, 2.0);
+    	testResources(1, 1000, 1.5, 1.5);
+    	testResources(1, 1000, 1.3, 1.0);
 
         // test cpu rounding
-        testResouces(1.0 / 1.5, 1000, 1.3, 1.0);
+    	testResources(1.0 / 1.5, 1000, 1.3, 1.0);
     }
 
-    public void testResouces(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
+    public void testResources(double userCpuRequest, long userMemoryRequest, double cpuOverCommitRatio, double memoryOverCommitRatio) throws Exception {
 
         Function.Resources resources = Function.Resources.newBuilder()
                 .setRam(userMemoryRequest).setCpu(userCpuRequest).setDisk(10000L).build();
@@ -492,9 +493,9 @@ public class KubernetesRuntimeTest {
         assertEquals(containerSpec.getResources().getRequests().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
         assertEquals(containerSpec.getResources().getLimits().get("cpu").getNumber().doubleValue(), RESOURCES.getCpu());
     }
-
+    
     @Test
-    public void testCreateJobName() throws Exception {
+    public void testCreateJobName() throws Exception {    
         verifyCreateJobNameWithBackwardCompatibility();
         verifyCreateJobNameWithUpperCaseFunctionName();
         verifyCreateJobNameWithDotFunctionName();
@@ -502,6 +503,8 @@ public class KubernetesRuntimeTest {
         verifyCreateJobNameWithInvalidMarksFunctionName();
         verifyCreateJobNameWithCollisionalFunctionName();
         verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName();
+        verifyCreateJobNameWithOverriddenK8sPodName();
+        verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks();
     }
 
     FunctionDetails createFunctionDetails(final String functionName) {
@@ -536,57 +539,71 @@ public class KubernetesRuntimeTest {
     private void verifyCreateJobNameWithBackwardCompatibility() throws Exception {
         final FunctionDetails functionDetails = createFunctionDetails(TEST_NAME);
         final String bcJobName = bcCreateJobName(functionDetails.getTenant(), functionDetails.getNamespace(), functionDetails.getName());
-        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
         assertEquals(bcJobName, jobName);
-        KubernetesRuntime.doChecks(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, null);
     }
 
     private void verifyCreateJobNameWithUpperCaseFunctionName() throws Exception {
         FunctionDetails functionDetails = createFunctionDetails("UpperCaseFunction");
-        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
         assertEquals(jobName, "pf-tenant-namespace-uppercasefunction-f0c5ca9a");
-        KubernetesRuntime.doChecks(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, null);
     }
 
     private void verifyCreateJobNameWithDotFunctionName() throws Exception {
         final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
-        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
         assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction");
-        KubernetesRuntime.doChecks(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, null);
     }
 
     private void verifyCreateJobNameWithDotAndUpperCaseFunctionName() throws Exception {
         final FunctionDetails functionDetails = createFunctionDetails("Clazz.TestFunction");
-        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
         assertEquals(jobName, "pf-tenant-namespace-clazz.testfunction-92ec5bf6");
-        KubernetesRuntime.doChecks(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, null);
     }
 
     private void verifyCreateJobNameWithInvalidMarksFunctionName() throws Exception {
         final FunctionDetails functionDetails = createFunctionDetails("test_function*name");
-        final String jobName = KubernetesRuntime.createJobName(functionDetails);
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, null);
         assertEquals(jobName, "pf-tenant-namespace-test-function-name-b5a215ad");
-        KubernetesRuntime.doChecks(functionDetails);
+        KubernetesRuntime.doChecks(functionDetails, null);
+    }
+    
+    private void verifyCreateJobNameWithOverriddenK8sPodName() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, "custom-k8s-pod-name");
+        assertEquals(jobName, "custom-k8s-pod-name");
+        KubernetesRuntime.doChecks(functionDetails, "custom-k8s-pod-name");
+    }
+    
+    private void verifyCreateJobNameWithOverriddenK8sPodNameWithInvalidMarks() throws Exception {
+        final FunctionDetails functionDetails = createFunctionDetails("clazz.testfunction");
+        final String jobName = KubernetesRuntime.createJobName(functionDetails, "invalid_pod*name");
+        assertEquals(jobName, "invalid-pod-name-04d0e74a");
+        KubernetesRuntime.doChecks(functionDetails, "invalid_pod*name");
     }
 
     private void verifyCreateJobNameWithCollisionalFunctionName() throws Exception {
         final FunctionDetails functionDetail1 = createFunctionDetails("testfunction");
         final FunctionDetails functionDetail2 = createFunctionDetails("testFunction");
-        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
-        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1, null);
+        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2, null);
         assertNotEquals(jobName1, jobName2);
-        KubernetesRuntime.doChecks(functionDetail1);
-        KubernetesRuntime.doChecks(functionDetail2);
+        KubernetesRuntime.doChecks(functionDetail1, null);
+        KubernetesRuntime.doChecks(functionDetail2, null);
     }
 
     private void verifyCreateJobNameWithCollisionalAndInvalidMarksFunctionName() throws Exception {
         final FunctionDetails functionDetail1 = createFunctionDetails("test_function*name");
         final FunctionDetails functionDetail2 = createFunctionDetails("test+function*name");
-        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1);
-        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2);
+        final String jobName1 = KubernetesRuntime.createJobName(functionDetail1, null);
+        final String jobName2 = KubernetesRuntime.createJobName(functionDetail2, null);
         assertNotEquals(jobName1, jobName2);
-        KubernetesRuntime.doChecks(functionDetail1);
-        KubernetesRuntime.doChecks(functionDetail2);
+        KubernetesRuntime.doChecks(functionDetail1, null);
+        KubernetesRuntime.doChecks(functionDetail2, null);
     }
 
     @Test
@@ -595,6 +612,7 @@ public class KubernetesRuntimeTest {
         config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
             JsonObject configObj = new JsonObject();
             configObj.addProperty("jobNamespace", "custom-ns");
+            configObj.addProperty("jobName", "custom-name");
 
             return fb.setCustomRuntimeOptions(configObj.toString());
         }));
@@ -606,6 +624,8 @@ public class KubernetesRuntimeTest {
 
         V1Service serviceSpec = container.createService();
         assertEquals(serviceSpec.getMetadata().getNamespace(), "default");
+        assertEquals(serviceSpec.getMetadata().getName(), "pf-" + TEST_TENANT + "-" + 
+        		TEST_NAMESPACE + "-" + TEST_NAME);
     }
 
     @Test
@@ -614,6 +634,7 @@ public class KubernetesRuntimeTest {
         config.setFunctionDetails(createFunctionDetails(FunctionDetails.Runtime.JAVA, false, (fb) -> {
             JsonObject configObj = new JsonObject();
             configObj.addProperty("jobNamespace", "custom-ns");
+            configObj.addProperty("jobName", "custom-name");
 
             JsonObject extraAnn = new JsonObject();
             extraAnn.addProperty("annotation", "test");
@@ -666,6 +687,7 @@ public class KubernetesRuntimeTest {
 
         V1Service serviceSpec = container.createService();
         assertEquals(serviceSpec.getMetadata().getNamespace(), "custom-ns");
+        assertEquals(serviceSpec.getMetadata().getName(), "custom-name");
         assertEquals(serviceSpec.getMetadata().getAnnotations().get("annotation"), "test");
         assertEquals(serviceSpec.getMetadata().getLabels().get("label"), "test");
 
diff --git a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
index 638780c..8a71a7c 100644
--- a/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
+++ b/pulsar-functions/runtime/src/test/java/org/apache/pulsar/functions/runtime/process/ProcessRuntimeTest.java
@@ -97,7 +97,7 @@ public class ProcessRuntimeTest {
         }
 
         @Override
-        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, FunctionDetails functionDetails) {
+        public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, FunctionDetails functionDetails) {
 
         }
     }
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
index 81e50fd..10988c7 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfigurator.java
@@ -107,7 +107,7 @@ public class KubernetesSecretsProviderConfigurator implements SecretsProviderCon
 
     // The secret object should be of type Map<String, String> and it should contain "id" and "key"
     @Override
-    public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {
+    public void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, Function.FunctionDetails functionDetails) {
         if (!StringUtils.isEmpty(functionDetails.getSecretsMap())) {
             Type type = new TypeToken<Map<String, Object>>() {
             }.getType();
diff --git a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
index e51242e..d65de8d 100644
--- a/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
+++ b/pulsar-functions/secrets/src/main/java/org/apache/pulsar/functions/secretsproviderconfigurator/SecretsProviderConfigurator.java
@@ -69,6 +69,6 @@ public interface SecretsProviderConfigurator {
     /**
      * Do config checks to see whether the secrets provided are conforming.
      */
-    default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, Function.FunctionDetails functionDetails) {}
+    default void doAdmissionChecks(AppsV1Api appsV1Api, CoreV1Api coreV1Api, String jobNamespace, String jobName, Function.FunctionDetails functionDetails) {}
 
 }
\ No newline at end of file
diff --git a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
index fc764cf..c2db339 100644
--- a/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
+++ b/pulsar-functions/secrets/src/test/java/org/apache/pulsar/functions/secretsproviderconfigurator/KubernetesSecretsProviderConfiguratorTest.java
@@ -38,7 +38,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
             HashMap<String, Object> map = new HashMap<String, Object>();
             map.put("secretname", "randomsecret");
             Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
-            provider.doAdmissionChecks(null, null, null, functionDetails);
+            provider.doAdmissionChecks(null, null, null, null, functionDetails);
             Assert.fail("Non conforming secret object should not validate");
         } catch (Exception e) {
         }
@@ -48,7 +48,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
             map1.put("secretname", "secretvalue");
             map.put("secretname", map1);
             Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
-            provider.doAdmissionChecks(null, null, null, functionDetails);
+            provider.doAdmissionChecks(null, null, null, null, functionDetails);
             Assert.fail("Non conforming secret object should not validate");
         } catch (Exception e) {
         }
@@ -59,7 +59,7 @@ public class KubernetesSecretsProviderConfiguratorTest {
             map1.put("key", "secretvalue");
             map.put("secretname", map1);
             Function.FunctionDetails functionDetails = Function.FunctionDetails.newBuilder().setSecretsMap(new Gson().toJson(map)).build();
-            provider.doAdmissionChecks(null, null, null, functionDetails);
+            provider.doAdmissionChecks(null, null, null, null, functionDetails);
         } catch (Exception e) {
             Assert.fail("Conforming secret object should validate");
         }
diff --git a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
index be2cfa7..1d6dd21 100644
--- a/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
+++ b/pulsar-functions/worker/src/test/java/org/apache/pulsar/functions/worker/FunctionRuntimeManagerTest.java
@@ -871,6 +871,7 @@ public class FunctionRuntimeManagerTest {
                 = new WorkerConfig.KubernetesContainerFactory();
         kubernetesContainerFactory.setK8Uri("k8Uri");
         kubernetesContainerFactory.setJobNamespace("jobNamespace");
+        kubernetesContainerFactory.setJobName("jobName");
         kubernetesContainerFactory.setPulsarDockerImageName("pulsarDockerImageName");
         kubernetesContainerFactory.setImagePullPolicy("imagePullPolicy");
         kubernetesContainerFactory.setPulsarRootDir("pulsarRootDir");