You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/09/29 19:19:49 UTC

[spark] branch master updated: [SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fa88651fee4 [SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1
fa88651fee4 is described below

commit fa88651fee4d65b7a84ce19b936055f74f1ccf72
Author: attilapiros <pi...@gmail.com>
AuthorDate: Thu Sep 29 12:19:17 2022 -0700

    [SPARK-40458][K8S] Bump Kubernetes Client Version to 6.1.1
    
    ### What changes were proposed in this pull request?
    
    Bump kubernetes-client version from 5.12.3 to 6.1.1 and clean up all the deprecations.
    
    ### Why are the changes needed?
    
    To keep up with kubernetes-client [changes](https://github.com/fabric8io/kubernetes-client/compare/v5.12.3...v6.1.1).
    As this is an upgrade where the main version changed I have cleaned up all the deprecations.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    #### Unit tests
    
    #### Manual tests for submit and application management
    
    Started an application in a non-default namespace (`bla`):
    
    ```
    ➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit \
        --master k8s://http://127.0.0.1:8001 \
        --deploy-mode cluster \
        --name spark-pi \
        --class org.apache.spark.examples.SparkPi \
        --conf spark.executor.instances=5 \
        --conf spark.kubernetes.namespace=bla \
        --conf spark.kubernetes.container.image=docker.io/kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D \
        local:///opt/spark/examples/jars/spark-examples_2.12-3.4.0-SNAPSHOT.jar 200000
    ```
    
    Check that we cannot find it in the default namespace even with glob without the namespace definition:
    
    ```
    ➜  spark git:(SPARK-40458) ✗ minikube kubectl -- config set-context --current --namespace=default
    Context "minikube" modified.
    ➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001
    Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001.
    No applications found.
    ```
    
    Then check we can find it by specifying the namespace:
    ```
    ➜  spark git:(SPARK-40458) ✗ ./bin/spark-submit --status "bla:spark-pi-*" --master k8s://http://127.0.0.1:8001
    Submitting a request for the status of submission bla:spark-pi-* in k8s://http://127.0.0.1:8001.
    Application status (driver):
             pod name: spark-pi-4c4e70837c86ae1a-driver
             namespace: bla
             labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT
             pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d
             creation time: 2022-09-27T01:19:06Z
             service account name: default
             volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw
             node name: minikube
             start time: 2022-09-27T01:19:06Z
             phase: Running
             container status:
                     container name: spark-kubernetes-driver
                     container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D
                     container state: running
                     container started at: 2022-09-27T01:19:07Z
    ```
    
    Changing the namespace to `bla` with `kubectl`:
    
    ```
    ➜  spark git:(SPARK-40458) ✗  minikube kubectl -- config set-context --current --namespace=bla
    Context "minikube" modified.
    ```
    
    Checking we can find it without specifying the namespace (and glob):
    ```
    ➜  spark git:(SPARK-40458) ✗  ./bin/spark-submit --status "spark-pi-*" --master k8s://http://127.0.0.1:8001
    Submitting a request for the status of submission spark-pi-* in k8s://http://127.0.0.1:8001.
    Application status (driver):
             pod name: spark-pi-4c4e70837c86ae1a-driver
             namespace: bla
             labels: spark-app-name -> spark-pi, spark-app-selector -> spark-c95a9a0888214c01a286eb7ba23980a0, spark-role -> driver, spark-version -> 3.4.0-SNAPSHOT
             pod uid: 0be8952e-3e00-47a3-9082-9cb45278ed6d
             creation time: 2022-09-27T01:19:06Z
             service account name: default
             volumes: spark-local-dir-1, spark-conf-volume-driver, kube-api-access-wxnqw
             node name: minikube
             start time: 2022-09-27T01:19:06Z
             phase: Running
             container status:
                     container name: spark-kubernetes-driver
                     container image: kubespark/spark:3.4.0-SNAPSHOT_064A99CC-57AF-46D5-B743-5B12692C260D
                     container state: running
                     container started at: 2022-09-27T01:19:07Z
    ```
    
    Killing the app:
    ```
    ➜  spark git:(SPARK-40458) ✗  ./bin/spark-submit --kill "spark-pi-*" --master k8s://http://127.0.0.1:8001
    Submitting a request to kill submission spark-pi-* in k8s://http://127.0.0.1:8001. Grace period in secs: not set.
    Deleting driver pod: spark-pi-4c4e70837c86ae1a-driver.
    ```
    
    Closes #37990 from attilapiros/SPARK-40458.
    
    Authored-by: attilapiros <pi...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 dev/deps/spark-deps-hadoop-2-hive-2.3              |  47 +--
 dev/deps/spark-deps-hadoop-3-hive-2.3              |  47 +--
 pom.xml                                            |   2 +-
 resource-managers/kubernetes/core/pom.xml          |   5 +
 .../deploy/k8s/SparkKubernetesClientFactory.scala  |   7 +-
 .../spark/deploy/k8s/submit/K8sSubmitOps.scala     |  37 +--
 .../k8s/submit/KubernetesClientApplication.scala   |   8 +-
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  16 +-
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala |  18 +-
 .../k8s/ExecutorPodsWatchSnapshotSource.scala      |   4 +
 .../k8s/KubernetesClusterSchedulerBackend.scala    |  35 ++-
 .../cluster/k8s/StatefulSetPodsAllocator.scala     |  15 +-
 .../apache/spark/deploy/k8s/Fabric8Aliases.scala   |  26 +-
 .../apache/spark/deploy/k8s/PodBuilderSuite.scala  |   8 +-
 .../spark/deploy/k8s/submit/ClientSuite.scala      |  19 +-
 .../spark/deploy/k8s/submit/K8sSubmitOpSuite.scala |  42 ++-
 .../cluster/k8s/ExecutorPodsAllocatorSuite.scala   | 331 +++++++++++----------
 .../k8s/ExecutorPodsLifecycleManagerSuite.scala    |  33 +-
 .../k8s/ExecutorPodsWatchSnapshotSourceSuite.scala |   6 +-
 .../KubernetesClusterSchedulerBackendSuite.scala   |  62 ++--
 .../cluster/k8s/StatefulSetAllocatorSuite.scala    |  31 +-
 .../k8s/integrationtest/ClientModeTestsSuite.scala |   3 +-
 .../integrationtest/SparkReadinessWatcher.scala    |   2 +-
 23 files changed, 455 insertions(+), 349 deletions(-)

diff --git a/dev/deps/spark-deps-hadoop-2-hive-2.3 b/dev/deps/spark-deps-hadoop-2-hive-2.3
index 3d21dfd3a68..8a9621e5364 100644
--- a/dev/deps/spark-deps-hadoop-2-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-2-hive-2.3
@@ -24,7 +24,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
 arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
 arrow-vector/9.0.0//arrow-vector-9.0.0.jar
 audience-annotations/0.5.0//audience-annotations-0.5.0.jar
-automaton/1.11-8//automaton-1.11-8.jar
 avro-ipc/1.11.1//avro-ipc-1.11.1.jar
 avro-mapred/1.11.1//avro-mapred-1.11.1.jar
 avro/1.11.1//avro-1.11.1.jar
@@ -69,7 +68,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
 failureaccess/1.0.1//failureaccess-1.0.1.jar
 flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
 gcs-connector/hadoop2-2.2.7/shaded/gcs-connector-hadoop2-2.2.7-shaded.jar
-generex/1.0.2//generex-1.0.2.jar
 gmetric4j/1.0.10//gmetric4j-1.0.10.jar
 grpc-api/1.47.0//grpc-api-1.47.0.jar
 grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -175,27 +173,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
 jta/1.1//jta-1.1.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
-kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
-kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
-kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
-kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
-kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
-kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
-kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
-kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
-kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
-kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
-kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
-kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
-kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
-kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
-kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
-kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
-kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
-kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
-kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
-kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
-kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
+kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
+kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
+kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
+kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
+kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
+kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
+kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
+kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
+kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
+kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
+kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
+kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
+kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
+kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
+kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
+kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
+kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
+kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
+kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
+kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
+kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
+kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
+kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
+kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
 lapack/3.0.2//lapack-3.0.2.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 libfb303/0.9.3//libfb303-0.9.3.jar
diff --git a/dev/deps/spark-deps-hadoop-3-hive-2.3 b/dev/deps/spark-deps-hadoop-3-hive-2.3
index 1fab542bad9..c26dfa3f9ce 100644
--- a/dev/deps/spark-deps-hadoop-3-hive-2.3
+++ b/dev/deps/spark-deps-hadoop-3-hive-2.3
@@ -23,7 +23,6 @@ arrow-memory-core/9.0.0//arrow-memory-core-9.0.0.jar
 arrow-memory-netty/9.0.0//arrow-memory-netty-9.0.0.jar
 arrow-vector/9.0.0//arrow-vector-9.0.0.jar
 audience-annotations/0.5.0//audience-annotations-0.5.0.jar
-automaton/1.11-8//automaton-1.11-8.jar
 avro-ipc/1.11.1//avro-ipc-1.11.1.jar
 avro-mapred/1.11.1//avro-mapred-1.11.1.jar
 avro/1.11.1//avro-1.11.1.jar
@@ -66,7 +65,6 @@ error_prone_annotations/2.10.0//error_prone_annotations-2.10.0.jar
 failureaccess/1.0.1//failureaccess-1.0.1.jar
 flatbuffers-java/1.12.0//flatbuffers-java-1.12.0.jar
 gcs-connector/hadoop3-2.2.7/shaded/gcs-connector-hadoop3-2.2.7-shaded.jar
-generex/1.0.2//generex-1.0.2.jar
 gmetric4j/1.0.10//gmetric4j-1.0.10.jar
 grpc-api/1.47.0//grpc-api-1.47.0.jar
 grpc-context/1.47.0//grpc-context-1.47.0.jar
@@ -159,27 +157,30 @@ jsr305/3.0.0//jsr305-3.0.0.jar
 jta/1.1//jta-1.1.jar
 jul-to-slf4j/1.7.36//jul-to-slf4j-1.7.36.jar
 kryo-shaded/4.0.2//kryo-shaded-4.0.2.jar
-kubernetes-client/5.12.3//kubernetes-client-5.12.3.jar
-kubernetes-model-admissionregistration/5.12.3//kubernetes-model-admissionregistration-5.12.3.jar
-kubernetes-model-apiextensions/5.12.3//kubernetes-model-apiextensions-5.12.3.jar
-kubernetes-model-apps/5.12.3//kubernetes-model-apps-5.12.3.jar
-kubernetes-model-autoscaling/5.12.3//kubernetes-model-autoscaling-5.12.3.jar
-kubernetes-model-batch/5.12.3//kubernetes-model-batch-5.12.3.jar
-kubernetes-model-certificates/5.12.3//kubernetes-model-certificates-5.12.3.jar
-kubernetes-model-common/5.12.3//kubernetes-model-common-5.12.3.jar
-kubernetes-model-coordination/5.12.3//kubernetes-model-coordination-5.12.3.jar
-kubernetes-model-core/5.12.3//kubernetes-model-core-5.12.3.jar
-kubernetes-model-discovery/5.12.3//kubernetes-model-discovery-5.12.3.jar
-kubernetes-model-events/5.12.3//kubernetes-model-events-5.12.3.jar
-kubernetes-model-extensions/5.12.3//kubernetes-model-extensions-5.12.3.jar
-kubernetes-model-flowcontrol/5.12.3//kubernetes-model-flowcontrol-5.12.3.jar
-kubernetes-model-metrics/5.12.3//kubernetes-model-metrics-5.12.3.jar
-kubernetes-model-networking/5.12.3//kubernetes-model-networking-5.12.3.jar
-kubernetes-model-node/5.12.3//kubernetes-model-node-5.12.3.jar
-kubernetes-model-policy/5.12.3//kubernetes-model-policy-5.12.3.jar
-kubernetes-model-rbac/5.12.3//kubernetes-model-rbac-5.12.3.jar
-kubernetes-model-scheduling/5.12.3//kubernetes-model-scheduling-5.12.3.jar
-kubernetes-model-storageclass/5.12.3//kubernetes-model-storageclass-5.12.3.jar
+kubernetes-client-api/6.1.1//kubernetes-client-api-6.1.1.jar
+kubernetes-client/6.1.1//kubernetes-client-6.1.1.jar
+kubernetes-httpclient-okhttp/6.1.1//kubernetes-httpclient-okhttp-6.1.1.jar
+kubernetes-model-admissionregistration/6.1.1//kubernetes-model-admissionregistration-6.1.1.jar
+kubernetes-model-apiextensions/6.1.1//kubernetes-model-apiextensions-6.1.1.jar
+kubernetes-model-apps/6.1.1//kubernetes-model-apps-6.1.1.jar
+kubernetes-model-autoscaling/6.1.1//kubernetes-model-autoscaling-6.1.1.jar
+kubernetes-model-batch/6.1.1//kubernetes-model-batch-6.1.1.jar
+kubernetes-model-certificates/6.1.1//kubernetes-model-certificates-6.1.1.jar
+kubernetes-model-common/6.1.1//kubernetes-model-common-6.1.1.jar
+kubernetes-model-coordination/6.1.1//kubernetes-model-coordination-6.1.1.jar
+kubernetes-model-core/6.1.1//kubernetes-model-core-6.1.1.jar
+kubernetes-model-discovery/6.1.1//kubernetes-model-discovery-6.1.1.jar
+kubernetes-model-events/6.1.1//kubernetes-model-events-6.1.1.jar
+kubernetes-model-extensions/6.1.1//kubernetes-model-extensions-6.1.1.jar
+kubernetes-model-flowcontrol/6.1.1//kubernetes-model-flowcontrol-6.1.1.jar
+kubernetes-model-gatewayapi/6.1.1//kubernetes-model-gatewayapi-6.1.1.jar
+kubernetes-model-metrics/6.1.1//kubernetes-model-metrics-6.1.1.jar
+kubernetes-model-networking/6.1.1//kubernetes-model-networking-6.1.1.jar
+kubernetes-model-node/6.1.1//kubernetes-model-node-6.1.1.jar
+kubernetes-model-policy/6.1.1//kubernetes-model-policy-6.1.1.jar
+kubernetes-model-rbac/6.1.1//kubernetes-model-rbac-6.1.1.jar
+kubernetes-model-scheduling/6.1.1//kubernetes-model-scheduling-6.1.1.jar
+kubernetes-model-storageclass/6.1.1//kubernetes-model-storageclass-6.1.1.jar
 lapack/3.0.2//lapack-3.0.2.jar
 leveldbjni-all/1.8//leveldbjni-all-1.8.jar
 libfb303/0.9.3//libfb303-0.9.3.jar
diff --git a/pom.xml b/pom.xml
index 5fbd82ad57a..22abe0f7d3c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -216,7 +216,7 @@
     <arrow.version>9.0.0</arrow.version>
     <!-- org.fusesource.leveldbjni will be used except on arm64 platform. -->
     <leveldbjni.group>org.fusesource.leveldbjni</leveldbjni.group>
-    <kubernetes-client.version>5.12.3</kubernetes-client.version>
+    <kubernetes-client.version>6.1.1</kubernetes-client.version>
 
     <test.java.home>${java.home}</test.java.home>
 
diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml
index 1c729cc441e..30d5f7d2bb8 100644
--- a/resource-managers/kubernetes/core/pom.xml
+++ b/resource-managers/kubernetes/core/pom.xml
@@ -75,6 +75,11 @@
       <scope>test</scope>
     </dependency>
 
+    <dependency>
+      <groupId>io.fabric8</groupId>
+      <artifactId>kubernetes-httpclient-okhttp</artifactId>
+      <version>${kubernetes-client.version}</version>
+    </dependency>
     <dependency>
       <groupId>io.fabric8</groupId>
       <artifactId>kubernetes-client</artifactId>
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
index 51040857c64..0b806f04640 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala
@@ -21,7 +21,7 @@ import java.io.File
 import com.fasterxml.jackson.databind.ObjectMapper
 import com.google.common.base.Charsets
 import com.google.common.io.Files
-import io.fabric8.kubernetes.client.{ConfigBuilder, DefaultKubernetesClient, KubernetesClient}
+import io.fabric8.kubernetes.client.{ConfigBuilder, KubernetesClient, KubernetesClientBuilder}
 import io.fabric8.kubernetes.client.Config.KUBERNETES_REQUEST_RETRY_BACKOFFLIMIT_SYSTEM_PROPERTY
 import io.fabric8.kubernetes.client.Config.autoConfigure
 import io.fabric8.kubernetes.client.okhttp.OkHttpClientFactory
@@ -115,7 +115,10 @@ private[spark] object SparkKubernetesClientFactory extends Logging {
     }
     logDebug("Kubernetes client config: " +
       new ObjectMapper().writerWithDefaultPrettyPrinter().writeValueAsString(config))
-    new DefaultKubernetesClient(factoryWithCustomDispatcher.createHttpClient(config), config)
+    new KubernetesClientBuilder()
+      .withHttpClientFactory(factoryWithCustomDispatcher)
+      .withConfig(config)
+      .build()
   }
 
   private implicit class OptionConfigurableConfigBuilder(val configBuilder: ConfigBuilder)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
index 0238d5eafde..2ce6181a2fe 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOps.scala
@@ -19,9 +19,9 @@ package org.apache.spark.deploy.k8s.submit
 import scala.collection.JavaConverters._
 
 import K8SSparkSubmitOperation.getGracePeriod
-import io.fabric8.kubernetes.api.model.{Pod, PodList}
+import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
 
 import org.apache.spark.SparkConf
 import org.apache.spark.deploy.SparkSubmitOperation
@@ -32,17 +32,15 @@ import org.apache.spark.deploy.k8s.KubernetesUtils.formatPodState
 import org.apache.spark.util.{CommandLineLoggingUtils, Utils}
 
 private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
-  type NON_NAMESPACED_PODS =
-    NonNamespaceOperation[Pod, PodList, PodResource[Pod]]
   def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
       (implicit client: KubernetesClient): Unit
   def executeOnGlob(pods: List[Pod], ns: Option[String], sparkConf: SparkConf)
       (implicit client: KubernetesClient): Unit
-  def listPodsInNameSpace(namespace: Option[String])
-      (implicit client: KubernetesClient): NON_NAMESPACED_PODS = {
+  def getPod(namespace: Option[String], name: String)
+      (implicit client: KubernetesClient): PodResource = {
     namespace match {
-      case Some(ns) => client.pods.inNamespace(ns)
-      case None => client.pods
+      case Some(ns) => client.pods.inNamespace(ns).withName(name)
+      case None => client.pods.withName(name)
     }
   }
 }
@@ -50,7 +48,7 @@ private sealed trait K8sSubmitOp extends CommandLineLoggingUtils {
 private class KillApplication extends K8sSubmitOp  {
   override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
       (implicit client: KubernetesClient): Unit = {
-    val podToDelete = listPodsInNameSpace(namespace).withName(pName)
+    val podToDelete = getPod(namespace, pName)
 
     if (Option(podToDelete).isDefined) {
       getGracePeriod(sparkConf) match {
@@ -66,19 +64,11 @@ private class KillApplication extends K8sSubmitOp  {
       (implicit client: KubernetesClient): Unit = {
     if (pods.nonEmpty) {
       pods.foreach { pod => printMessage(s"Deleting driver pod: ${pod.getMetadata.getName}.") }
-      val listedPods = listPodsInNameSpace(namespace)
-
       getGracePeriod(sparkConf) match {
         case Some(period) =>
-          // this is not using the batch api because no option is provided
-          // when using the grace period.
-          pods.foreach { pod =>
-            listedPods
-              .withName(pod.getMetadata.getName)
-              .withGracePeriod(period)
-              .delete()
-          }
-        case _ => listedPods.delete(pods.asJava)
+          client.resourceList(pods.asJava).withGracePeriod(period).delete()
+        case _ =>
+          client.resourceList(pods.asJava).delete()
       }
     } else {
       printMessage("No applications found.")
@@ -89,7 +79,7 @@ private class KillApplication extends K8sSubmitOp  {
 private class ListStatus extends K8sSubmitOp {
   override def executeOnPod(pName: String, namespace: Option[String], sparkConf: SparkConf)
       (implicit client: KubernetesClient): Unit = {
-    val pod = listPodsInNameSpace(namespace).withName(pName).get()
+    val pod = getPod(namespace, pName).get()
     if (Option(pod).isDefined) {
       printMessage("Application status (driver): " +
         Option(pod).map(formatPodState).getOrElse("unknown."))
@@ -145,13 +135,12 @@ private[spark] class K8SSparkSubmitOperation extends SparkSubmitOperation
                   .pods
             }
             val pods = ops
+              .withLabel(SPARK_ROLE_LABEL, SPARK_POD_DRIVER_ROLE)
               .list()
               .getItems
               .asScala
               .filter { pod =>
-                val meta = pod.getMetadata
-                meta.getName.startsWith(pName.stripSuffix("*")) &&
-                  meta.getLabels.get(SPARK_ROLE_LABEL) == SPARK_POD_DRIVER_ROLE
+                pod.getMetadata.getName.startsWith(pName.stripSuffix("*"))
               }.toList
             op.executeOnGlob(pods, namespace, sparkConf)
           } else {
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
index 3a3ab081fe8..14d3c4d1f42 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesClientApplication.scala
@@ -149,7 +149,8 @@ private[spark] class Client(
     var watch: Watch = null
     var createdDriverPod: Pod = null
     try {
-      createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
+      createdDriverPod =
+        kubernetesClient.pods().inNamespace(conf.namespace).resource(resolvedDriverPod).create()
     } catch {
       case NonFatal(e) =>
         kubernetesClient.resourceList(preKubernetesResources: _*).delete()
@@ -163,7 +164,7 @@ private[spark] class Client(
       kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
     } catch {
       case NonFatal(e) =>
-        kubernetesClient.pods().delete(createdDriverPod)
+        kubernetesClient.pods().resource(createdDriverPod).delete()
         kubernetesClient.resourceList(preKubernetesResources: _*).delete()
         throw e
     }
@@ -175,7 +176,7 @@ private[spark] class Client(
       kubernetesClient.resourceList(otherKubernetesResources: _*).createOrReplace()
     } catch {
       case NonFatal(e) =>
-        kubernetesClient.pods().delete(createdDriverPod)
+        kubernetesClient.pods().resource(createdDriverPod).delete()
         throw e
     }
 
@@ -185,6 +186,7 @@ private[spark] class Client(
         while (true) {
           val podWithName = kubernetesClient
             .pods()
+            .inNamespace(conf.namespace)
             .withName(driverPodName)
           // Reset resource to old before we start the watch, this is important for race conditions
           watcher.reset()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 9bdc30e4466..524ab0c845c 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -76,6 +76,7 @@ class ExecutorPodsAllocator(
 
   val driverPod = kubernetesDriverPodName
     .map(name => Option(kubernetesClient.pods()
+      .inNamespace(namespace)
       .withName(name)
       .get())
       .getOrElse(throw new SparkException(
@@ -112,6 +113,7 @@ class ExecutorPodsAllocator(
       Utils.tryLogNonFatalError {
         kubernetesClient
           .pods()
+          .inNamespace(namespace)
           .withName(pod.getMetadata.getName)
           .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
       }
@@ -185,6 +187,7 @@ class ExecutorPodsAllocator(
         Utils.tryLogNonFatalError {
           kubernetesClient
             .pods()
+            .inNamespace(namespace)
             .withLabel(SPARK_APP_ID_LABEL, applicationId)
             .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
             .withLabelIn(SPARK_EXECUTOR_ID_LABEL, timedOut.toSeq.map(_.toString): _*)
@@ -299,6 +302,7 @@ class ExecutorPodsAllocator(
           Utils.tryLogNonFatalError {
             kubernetesClient
               .pods()
+              .inNamespace(namespace)
               .withField("status.phase", "Pending")
               .withLabel(SPARK_APP_ID_LABEL, applicationId)
               .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
@@ -363,6 +367,7 @@ class ExecutorPodsAllocator(
       try {
         val createdPVCs = kubernetesClient
           .persistentVolumeClaims
+          .inNamespace(namespace)
           .withLabel("spark-app-selector", applicationId)
           .list()
           .getItems
@@ -406,7 +411,8 @@ class ExecutorPodsAllocator(
         .build()
       val resources = replacePVCsIfNeeded(
         podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
-      val createdExecutorPod = kubernetesClient.pods().create(podWithAttachedContainer)
+      val createdExecutorPod =
+        kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
       try {
         addOwnerReference(createdExecutorPod, resources)
         resources
@@ -418,13 +424,16 @@ class ExecutorPodsAllocator(
             val pvc = resource.asInstanceOf[PersistentVolumeClaim]
             logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
               s"StorageClass ${pvc.getSpec.getStorageClassName}")
-            kubernetesClient.persistentVolumeClaims().create(pvc)
+            kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
           }
         newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
         logDebug(s"Requested executor with id $newExecutorId from Kubernetes.")
       } catch {
         case NonFatal(e) =>
-          kubernetesClient.pods().delete(createdExecutorPod)
+          kubernetesClient.pods()
+            .inNamespace(namespace)
+            .resource(createdExecutorPod)
+            .delete()
           throw e
       }
     }
@@ -475,6 +484,7 @@ class ExecutorPodsAllocator(
     Utils.tryLogNonFatalError {
       kubernetesClient
         .pods()
+        .inNamespace(namespace)
         .withLabel(SPARK_APP_ID_LABEL, applicationId)
         .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
         .delete()
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index e255de4d2dd..7d32b35eab9 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.scheduler.cluster.k8s
 
 import java.util.concurrent.TimeUnit
+import java.util.function.UnaryOperator
 
 import com.google.common.cache.CacheBuilder
 import io.fabric8.kubernetes.api.model.{Pod, PodBuilder}
@@ -57,6 +58,8 @@ private[spark] class ExecutorPodsLifecycleManager(
   // This set is cleaned up when a snapshot containing the updated pod is processed.
   private val inactivatedPods = mutable.HashSet.empty[Long]
 
+  private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
   def start(schedulerBackend: KubernetesClusterSchedulerBackend): Unit = {
     val eventProcessingInterval = conf.get(KUBERNETES_EXECUTOR_EVENT_PROCESSING_INTERVAL)
     snapshotsStore.addSubscriber(eventProcessingInterval) {
@@ -168,6 +171,7 @@ private[spark] class ExecutorPodsLifecycleManager(
         // of getting rid of the pod is what matters.
         kubernetesClient
           .pods()
+          .inNamespace(namespace)
           .withName(updatedPod.getMetadata.getName)
           .delete()
       } else if (!inactivatedPods.contains(execId) && !isPodInactive(updatedPod)) {
@@ -175,16 +179,11 @@ private[spark] class ExecutorPodsLifecycleManager(
         // can be ignored in future updates from the API server.
         logDebug(s"Marking executor ${updatedPod.getMetadata.getName} as inactive since " +
           "deletion is disabled.")
-        val inactivatedPod = new PodBuilder(updatedPod)
-          .editMetadata()
-            .addToLabels(Map(SPARK_EXECUTOR_INACTIVE_LABEL -> "true").asJava)
-            .endMetadata()
-          .build()
-
         kubernetesClient
           .pods()
+          .inNamespace(namespace)
           .withName(updatedPod.getMetadata.getName)
-          .patch(inactivatedPod)
+          .edit(executorInactivationFn)
 
         inactivatedPods += execId
       }
@@ -274,4 +273,9 @@ private object ExecutorPodsLifecycleManager {
     s"${code}${humanStr}"
   }
 
+  def executorInactivationFn: UnaryOperator[Pod] = (p: Pod) => new PodBuilder(p)
+    .editOrNewMetadata()
+    .addToLabels(SPARK_EXECUTOR_INACTIVE_LABEL, "true")
+    .endMetadata()
+    .build()
 }
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
index a334ece5653..4809222650d 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala
@@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.Watcher.Action
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_EXECUTOR_ENABLE_API_WATCHER
+import org.apache.spark.deploy.k8s.Config.KUBERNETES_NAMESPACE
 import org.apache.spark.deploy.k8s.Constants._
 import org.apache.spark.internal.Logging
 import org.apache.spark.util.Utils
@@ -46,6 +47,8 @@ class ExecutorPodsWatchSnapshotSource(
   private var watchConnection: Closeable = _
   private val enableWatching = conf.get(KUBERNETES_EXECUTOR_ENABLE_API_WATCHER)
 
+  private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
   // If we're constructed with the old API get the SparkConf from the running SparkContext.
   def this(snapshotsStore: ExecutorPodsSnapshotsStore, kubernetesClient: KubernetesClient) = {
     this(snapshotsStore, kubernetesClient, SparkContext.getOrCreate().conf)
@@ -58,6 +61,7 @@ class ExecutorPodsWatchSnapshotSource(
       logDebug(s"Starting to watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
         s" $SPARK_ROLE_LABEL=$SPARK_POD_EXECUTOR_ROLE.")
       watchConnection = kubernetesClient.pods()
+        .inNamespace(namespace)
         .withLabel(SPARK_APP_ID_LABEL, applicationId)
         .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
         .watch(new ExecutorPodsWatcher())
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
index 985b8b7bef0..4a8cb6d7050 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala
@@ -19,7 +19,6 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.concurrent.{ScheduledExecutorService, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 
-import scala.collection.JavaConverters._
 import scala.concurrent.Future
 
 import io.fabric8.kubernetes.api.model.Pod
@@ -69,6 +68,8 @@ private[spark] class KubernetesClusterSchedulerBackend(
 
   private val defaultProfile = scheduler.sc.resourceProfileManager.defaultResourceProfile
 
+  private val namespace = conf.get(KUBERNETES_NAMESPACE)
+
   // Allow removeExecutor to be accessible by ExecutorPodsLifecycleEventHandler
   private[k8s] def doRemoveExecutor(executorId: String, reason: ExecutorLossReason): Unit = {
     removeExecutor(executorId, reason)
@@ -77,7 +78,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
   private def setUpExecutorConfigMap(driverPod: Option[Pod]): Unit = {
     val configMapName = KubernetesClientUtils.configMapNameExecutor
     val resolvedExecutorProperties =
-      Map(KUBERNETES_NAMESPACE.key -> conf.get(KUBERNETES_NAMESPACE))
+      Map(KUBERNETES_NAMESPACE.key -> namespace)
     val confFilesMap = KubernetesClientUtils
       .buildSparkConfDirFilesMap(configMapName, conf, resolvedExecutorProperties) ++
       resolvedExecutorProperties
@@ -85,7 +86,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
       Map(SPARK_APP_ID_LABEL -> applicationId(), SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE)
     val configMap = KubernetesClientUtils.buildConfigMap(configMapName, confFilesMap, labels)
     KubernetesUtils.addOwnerReference(driverPod.orNull, Seq(configMap))
-    kubernetesClient.configMaps().create(configMap)
+    kubernetesClient.configMaps().inNamespace(namespace).resource(configMap).create()
   }
 
   /**
@@ -136,6 +137,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
       Utils.tryLogNonFatalError {
         kubernetesClient
           .services()
+          .inNamespace(namespace)
           .withLabel(SPARK_APP_ID_LABEL, applicationId())
           .delete()
       }
@@ -145,6 +147,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
       Utils.tryLogNonFatalError {
         kubernetesClient
           .persistentVolumeClaims()
+          .inNamespace(namespace)
           .withLabel(SPARK_APP_ID_LABEL, applicationId())
           .delete()
       }
@@ -158,6 +161,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
         Utils.tryLogNonFatalError {
           kubernetesClient
             .configMaps()
+            .inNamespace(namespace)
             .withLabel(SPARK_APP_ID_LABEL, applicationId())
             .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
             .delete()
@@ -193,22 +197,19 @@ private[spark] class KubernetesClusterSchedulerBackend(
     conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL).foreach { label =>
       val labelTask = new Runnable() {
         override def run(): Unit = Utils.tryLogNonFatalError {
-
-          val podsToLabel = kubernetesClient.pods()
+          kubernetesClient.pods()
+            .inNamespace(namespace)
             .withLabel(SPARK_APP_ID_LABEL, applicationId())
             .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
             .withLabelIn(SPARK_EXECUTOR_ID_LABEL, execIds: _*)
-            .list().getItems().asScala
-
-          podsToLabel.foreach { pod =>
-            kubernetesClient.pods()
-              .inNamespace(pod.getMetadata.getNamespace)
-              .withName(pod.getMetadata.getName)
-              .edit({p: Pod => new PodBuilder(p).editMetadata()
-                .addToLabels(label,
-                  conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
-                .endMetadata()
-                .build()})
+            .resources()
+            .forEach { podResource =>
+              podResource.edit({ p: Pod =>
+                new PodBuilder(p).editOrNewMetadata()
+                  .addToLabels(label,
+                    conf.get(KUBERNETES_EXECUTOR_DECOMMISSION_LABEL_VALUE).getOrElse(""))
+                  .endMetadata()
+                  .build()})
           }
         }
       }
@@ -246,6 +247,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
       override def run(): Unit = Utils.tryLogNonFatalError {
         val running = kubernetesClient
           .pods()
+          .inNamespace(namespace)
           .withField("status.phase", "Running")
           .withLabel(SPARK_APP_ID_LABEL, applicationId())
           .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)
@@ -302,6 +304,7 @@ private[spark] class KubernetesClusterSchedulerBackend(
           override def run(): Unit = Utils.tryLogNonFatalError {
             // Label the pod with it's exec ID
             kubernetesClient.pods()
+              .inNamespace(namespace)
               .withName(x.podName)
               .edit({p: Pod => new PodBuilder(p).editMetadata()
                 .addToLabels(SPARK_EXECUTOR_ID_LABEL, newId)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
index 294ee70168b..5eeab5501e4 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetPodsAllocator.scala
@@ -53,6 +53,7 @@ class StatefulSetPodsAllocator(
 
   val driverPod = kubernetesDriverPodName
     .map(name => Option(kubernetesClient.pods()
+      .inNamespace(namespace)
       .withName(name)
       .get())
       .getOrElse(throw new SparkException(
@@ -69,6 +70,7 @@ class StatefulSetPodsAllocator(
       Utils.tryLogNonFatalError {
         kubernetesClient
           .pods()
+          .inNamespace(namespace)
           .withName(pod.getMetadata.getName)
           .waitUntilReady(driverPodReadinessTimeout, TimeUnit.SECONDS)
       }
@@ -99,7 +101,7 @@ class StatefulSetPodsAllocator(
       applicationId: String,
       resourceProfileId: Int): Unit = {
     if (setsCreated.contains(resourceProfileId)) {
-      val statefulset = kubernetesClient.apps().statefulSets().withName(
+      val statefulset = kubernetesClient.apps().statefulSets().inNamespace(namespace).withName(
         setName(applicationId, resourceProfileId: Int))
       statefulset.scale(expected, false /* wait */)
     } else {
@@ -169,7 +171,7 @@ class StatefulSetPodsAllocator(
       val statefulSet = new io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder()
         .withNewMetadata()
           .withName(setName(applicationId, resourceProfileId))
-          .withNamespace(conf.get(KUBERNETES_NAMESPACE))
+          .withNamespace(namespace)
         .endMetadata()
         .withNewSpec()
           .withPodManagementPolicy("Parallel")
@@ -185,7 +187,7 @@ class StatefulSetPodsAllocator(
         .build()
 
       addOwnerReference(driverPod.get, Seq(statefulSet))
-      kubernetesClient.apps().statefulSets().create(statefulSet)
+      kubernetesClient.apps().statefulSets().inNamespace(namespace).resource(statefulSet).create()
       setsCreated += (resourceProfileId)
     }
   }
@@ -194,7 +196,12 @@ class StatefulSetPodsAllocator(
     // Cleanup the statefulsets when we stop
     setsCreated.foreach { rpid =>
       Utils.tryLogNonFatalError {
-        kubernetesClient.apps().statefulSets().withName(setName(applicationId, rpid)).delete()
+        kubernetesClient
+          .apps()
+          .statefulSets()
+          .inNamespace(namespace)
+          .withName(setName(applicationId, rpid))
+          .delete()
       }
     }
   }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
index 14405da7281..1a4bc9781da 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/Fabric8Aliases.scala
@@ -17,19 +17,31 @@
 package org.apache.spark.deploy.k8s
 
 import io.fabric8.kubernetes.api.model.{ConfigMap, ConfigMapList, HasMetadata, PersistentVolumeClaim, PersistentVolumeClaimList, Pod, PodList}
-import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource, Resource}
+import io.fabric8.kubernetes.api.model.apps.StatefulSet
+import io.fabric8.kubernetes.api.model.apps.StatefulSetList
+import io.fabric8.kubernetes.client.dsl.{FilterWatchListDeletable, MixedOperation, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, NonNamespaceOperation, PodResource, Resource, RollableScalableResource}
 
 object Fabric8Aliases {
-  type PODS = MixedOperation[Pod, PodList, PodResource[Pod]]
+  type PODS = MixedOperation[Pod, PodList, PodResource]
+  type PODS_WITH_NAMESPACE = NonNamespaceOperation[Pod, PodList, PodResource]
   type CONFIG_MAPS = MixedOperation[
     ConfigMap, ConfigMapList, Resource[ConfigMap]]
-  type LABELED_PODS = FilterWatchListDeletable[Pod, PodList]
-  type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList]
-  type SINGLE_POD = PodResource[Pod]
+  type CONFIG_MAPS_WITH_NAMESPACE =
+    NonNamespaceOperation[ConfigMap, ConfigMapList, Resource[ConfigMap]]
+  type CONFIG_MAPS_RESOURCE = Resource[ConfigMap]
+  type LABELED_PODS = FilterWatchListDeletable[Pod, PodList, PodResource]
+  type LABELED_CONFIG_MAPS = FilterWatchListDeletable[ConfigMap, ConfigMapList, Resource[ConfigMap]]
+  type SINGLE_POD = PodResource
   type RESOURCE_LIST = NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[
     HasMetadata]
+  type STATEFUL_SET_RES = RollableScalableResource[StatefulSet]
+  type STATEFUL_SETS = MixedOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
+  type STATEFUL_SETS_NAMESPACED =
+    NonNamespaceOperation[StatefulSet, StatefulSetList, STATEFUL_SET_RES]
   type PERSISTENT_VOLUME_CLAIMS = MixedOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
     Resource[PersistentVolumeClaim]]
-  type LABELED_PERSISTENT_VOLUME_CLAIMS =
-    FilterWatchListDeletable[PersistentVolumeClaim, PersistentVolumeClaimList]
+  type PVC_WITH_NAMESPACE = NonNamespaceOperation[PersistentVolumeClaim, PersistentVolumeClaimList,
+    Resource[PersistentVolumeClaim]]
+  type LABELED_PERSISTENT_VOLUME_CLAIMS = FilterWatchListDeletable[PersistentVolumeClaim,
+    PersistentVolumeClaimList, Resource[PersistentVolumeClaim]]
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
index 642c18db541..dc2d354af98 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/PodBuilderSuite.scala
@@ -20,12 +20,13 @@ import java.io.File
 
 import io.fabric8.kubernetes.api.model.{Config => _, _}
 import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{MixedOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{mock, never, verify, when}
 import scala.collection.JavaConverters._
 
 import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.k8s.Fabric8Aliases._
 import org.apache.spark.deploy.k8s.features.{KubernetesDriverCustomFeatureConfigStep, KubernetesExecutorCustomFeatureConfigStep, KubernetesFeatureConfigStep}
 import org.apache.spark.internal.config.ConfigEntry
 
@@ -156,9 +157,8 @@ abstract class PodBuilderSuite extends SparkFunSuite {
 
   protected def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
     val kubernetesClient = mock(classOf[KubernetesClient])
-    val pods =
-      mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]])
-    val podResource = mock(classOf[PodResource[Pod]])
+    val pods = mock(classOf[PODS])
+    val podResource = mock(classOf[PodResource])
     when(kubernetesClient.pods()).thenReturn(pods)
     when(pods.load(any(classOf[File]))).thenReturn(podResource)
     when(podResource.get()).thenReturn(pod)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
index 12a5202b9d0..a8c25ab5002 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/ClientSuite.scala
@@ -149,7 +149,10 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
   private var podOperations: PODS = _
 
   @Mock
-  private var namedPods: PodResource[Pod] = _
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+  @Mock
+  private var namedPods: PodResource = _
 
   @Mock
   private var loggingPodStatusWatcher: LoggingPodStatusWatcher = _
@@ -170,11 +173,13 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
     when(driverBuilder.buildFromFeatures(kconf, kubernetesClient)).thenReturn(BUILT_KUBERNETES_SPEC)
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withName(POD_NAME)).thenReturn(namedPods)
+    when(podOperations.inNamespace(kconf.namespace)).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withName(POD_NAME)).thenReturn(namedPods)
 
     createdPodArgumentCaptor = ArgumentCaptor.forClass(classOf[Pod])
     createdResourcesArgumentCaptor = ArgumentCaptor.forClass(classOf[HasMetadata])
-    when(podOperations.create(fullExpectedPod())).thenReturn(podWithOwnerReference())
+    when(podsWithNamespace.resource(fullExpectedPod())).thenReturn(namedPods)
+    when(namedPods.create()).thenReturn(podWithOwnerReference())
     when(namedPods.watch(loggingPodStatusWatcher)).thenReturn(mock[Watch])
     when(loggingPodStatusWatcher.watchOrStop(kconf.namespace + ":" + POD_NAME)).thenReturn(true)
     doReturn(resourceList)
@@ -189,7 +194,8 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
       kubernetesClient,
       loggingPodStatusWatcher)
     submissionClient.run()
-    verify(podOperations).create(fullExpectedPod())
+    verify(podsWithNamespace).resource(fullExpectedPod())
+    verify(namedPods).create()
   }
 
   test("The client should create Kubernetes resources") {
@@ -298,8 +304,9 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
     val expectedKeyToPaths = (expectedConfFiles.map(x => new KeyToPath(x, 420, x)).toList ++
       List(KEY_TO_PATH)).sortBy(x => x.getKey)
 
-    when(podOperations.create(fullExpectedPod(expectedKeyToPaths)))
-      .thenReturn(podWithOwnerReference(expectedKeyToPaths))
+    when(podsWithNamespace.resource(fullExpectedPod(expectedKeyToPaths)))
+      .thenReturn(namedPods)
+    when(namedPods.create()).thenReturn(podWithOwnerReference(expectedKeyToPaths))
 
     kconf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf,
       resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
index 142d3fe112d..3d30fb320d6 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/K8sSubmitOpSuite.scala
@@ -17,12 +17,13 @@
 package org.apache.spark.deploy.k8s.submit
 
 import java.io.PrintStream
+import java.util.Arrays
 
 import scala.collection.JavaConverters._
 
 import io.fabric8.kubernetes.api.model._
-import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.PodResource
+import io.fabric8.kubernetes.client.{KubernetesClient, PropagationPolicyConfigurable}
+import io.fabric8.kubernetes.client.dsl.{Deletable, NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable, PodResource}
 import org.mockito.{ArgumentMatchers, Mock, MockitoAnnotations}
 import org.mockito.Mockito.{times, verify, when}
 import org.scalatest.BeforeAndAfter
@@ -30,9 +31,10 @@ import org.scalatest.BeforeAndAfter
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.k8s.Config.KUBERNETES_SUBMIT_GRACE_PERIOD
 import org.apache.spark.deploy.k8s.Constants.{SPARK_APP_ID_LABEL, SPARK_POD_DRIVER_ROLE, SPARK_ROLE_LABEL}
-import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS
+import org.apache.spark.deploy.k8s.Fabric8Aliases.{PODS, PODS_WITH_NAMESPACE}
 import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils.TEST_SPARK_APP_ID
 
+
 class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
   private val driverPodName1 = "driver1"
   private val driverPodName2 = "driver2"
@@ -45,28 +47,39 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
   private var podOperations: PODS = _
 
   @Mock
-  private var driverPodOperations1: PodResource[Pod] = _
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+  @Mock
+  private var driverPodOperations1: PodResource = _
 
   @Mock
-  private var driverPodOperations2: PodResource[Pod] = _
+  private var driverPodOperations2: PodResource = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
 
+  @Mock
+  private var deletable: PropagationPolicyConfigurable[_ <: Deletable] = _
+
+  @Mock
+  private var deletableList:
+    NamespaceListVisitFromServerGetDeleteRecreateWaitApplicable[HasMetadata] = _
+
   @Mock
   private var err: PrintStream = _
 
+  private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*)
+
   before {
     MockitoAnnotations.openMocks(this).close()
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.inNamespace(namespace)).thenReturn(podOperations)
-    when(podOperations.delete(podList.asJava)).thenReturn(true)
-    when(podOperations.withName(driverPodName1)).thenReturn(driverPodOperations1)
-    when(podOperations.withName(driverPodName2)).thenReturn(driverPodOperations2)
+    when(podOperations.inNamespace(namespace)).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withName(driverPodName1)).thenReturn(driverPodOperations1)
+    when(podsWithNamespace.withName(driverPodName2)).thenReturn(driverPodOperations2)
     when(driverPodOperations1.get).thenReturn(driverPod1)
-    when(driverPodOperations1.delete()).thenReturn(true)
+    when(driverPodOperations1.delete()).thenReturn(Arrays.asList(new StatusDetails))
     when(driverPodOperations2.get).thenReturn(driverPod2)
-    when(driverPodOperations2.delete()).thenReturn(true)
+    when(driverPodOperations2.delete()).thenReturn(Arrays.asList(new StatusDetails))
   }
 
   test("List app status") {
@@ -101,18 +114,19 @@ class K8sSubmitOpSuite extends SparkFunSuite with BeforeAndAfter {
     implicit val kubeClient: KubernetesClient = kubernetesClient
     val killApp = new KillApplication
     val conf = new SparkConf().set(KUBERNETES_SUBMIT_GRACE_PERIOD, 1L)
-    when(driverPodOperations1.withGracePeriod(1L)).thenReturn(driverPodOperations1)
+    doReturn(deletable).when(driverPodOperations1).withGracePeriod(1L)
     killApp.executeOnPod(driverPodName1, Option(namespace), conf)
     verify(driverPodOperations1, times(1)).withGracePeriod(1L)
-    verify(driverPodOperations1, times(1)).delete()
+    verify(deletable, times(1)).delete()
   }
 
   test("Kill multiple apps with glob without gracePeriod") {
     implicit val kubeClient: KubernetesClient = kubernetesClient
     val killApp = new KillApplication
     killApp.printStream = err
+    doReturn(deletableList).when(kubernetesClient).resourceList(podList.asJava)
     killApp.executeOnGlob(podList, Option(namespace), new SparkConf())
-    verify(podOperations, times(1)).delete(podList.asJava)
+    verify(deletableList, times(1)).delete()
     // scalastyle:off
     verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName1."))
     verify(err).println(ArgumentMatchers.eq(s"Deleting driver pod: $driverPodName2."))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
index 7ce0b57d1e9..caec9ef9201 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala
@@ -26,7 +26,7 @@ import io.fabric8.kubernetes.api.model._
 import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException}
 import io.fabric8.kubernetes.client.dsl.PodResource
 import org.mockito.{Mock, MockitoAnnotations}
-import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.ArgumentMatchers.{any, anyString, eq => meq}
 import org.mockito.Mockito.{never, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
@@ -77,9 +77,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   @Mock
   private var podOperations: PODS = _
 
+  @Mock
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+  @Mock
+  private var podResource: PodResource = _
+
   @Mock
   private var persistentVolumeClaims: PERSISTENT_VOLUME_CLAIMS = _
 
+  @Mock
+  private var pvcWithNamespace: PVC_WITH_NAMESPACE = _
+
   @Mock
   private var labeledPersistentVolumeClaims: LABELED_PERSISTENT_VOLUME_CLAIMS = _
 
@@ -90,7 +99,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   private var labeledPods: LABELED_PODS = _
 
   @Mock
-  private var driverPodOperations: PodResource[Pod] = _
+  private var driverPodOperations: PodResource = _
 
   @Mock
   private var executorBuilder: KubernetesExecutorBuilder = _
@@ -107,7 +116,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   before {
     MockitoAnnotations.openMocks(this).close()
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(podsWithNamespace.resource(any())).thenReturn(podResource)
+    when(podsWithNamespace.withLabel(anyString(), anyString())).thenReturn(labeledPods)
+    when(podsWithNamespace.withLabelIn(anyString(), any())).thenReturn(labeledPods)
+    when(podsWithNamespace.withField(anyString(), anyString())).thenReturn(labeledPods)
+    when(labeledPods.withLabel(anyString(), anyString())).thenReturn(labeledPods)
+    when(labeledPods.withLabelIn(anyString(), any())).thenReturn(labeledPods)
+    when(labeledPods.withField(anyString(), anyString())).thenReturn(labeledPods)
     when(driverPodOperations.get).thenReturn(driverPod)
     when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
     when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
@@ -119,7 +136,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     when(schedulerBackend.getExecutorIds).thenReturn(Seq.empty)
     podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
     when(kubernetesClient.persistentVolumeClaims()).thenReturn(persistentVolumeClaims)
-    when(persistentVolumeClaims.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
+    when(persistentVolumeClaims.inNamespace("default")).thenReturn(pvcWithNamespace)
+    when(pvcWithNamespace.withLabel(any(), any())).thenReturn(labeledPersistentVolumeClaims)
     when(labeledPersistentVolumeClaims.list()).thenReturn(persistentVolumeClaimList)
     when(persistentVolumeClaimList.getItems).thenReturn(Seq.empty[PersistentVolumeClaim].asJava)
   }
@@ -170,9 +188,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 3))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
+    verify(podResource, times(3)).create()
 
     // Mark executor 2 and 3 as pending, leave 1 as newly created but this does not free up
     // any pending pod slot so no new pod is requested
@@ -180,8 +199,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(pendingExecutor(3, rp.id))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations, times(3)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(3)).create()
+    verify(labeledPods, never()).delete()
 
     // Downscaling for defaultProfile resource ID with 1 executor to make one free slot
     // for pendings pods
@@ -189,16 +208,16 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 3))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
-    verify(podOperations, times(1)).delete()
+    verify(labeledPods, times(1)).delete()
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
 
     // Make one pod running this way we have one more free slot for pending pods
     snapshotsStore.updatePod(runningExecutor(3, rp.id))
     snapshotsStore.updatePod(pendingExecutor(4, rp.id))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
-    verify(podOperations, times(1)).delete()
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+    verify(labeledPods, times(1)).delete()
   }
 
   test("Initially request executors in batches. Do not request another batch if the" +
@@ -206,9 +225,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> (podAllocationSize + 1)))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     for (nextId <- 1 to podAllocationSize) {
-      verify(podOperations).create(podWithAttachedContainerForId(nextId))
+      verify(podsWithNamespace).resource(podWithAttachedContainerForId(nextId))
     }
-    verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    verify(podsWithNamespace, never())
+      .resource(podWithAttachedContainerForId(podAllocationSize + 1))
   }
 
   test("Request executors in batches. Allow another batch to be requested if" +
@@ -225,15 +245,17 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 5)
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations, never()).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    verify(podsWithNamespace, never())
+      .resource(podWithAttachedContainerForId(podAllocationSize + 1))
+    verify(podResource, times(podAllocationSize)).create()
     snapshotsStore.updatePod(runningExecutor(podAllocationSize))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1))
     snapshotsStore.updatePod(runningExecutor(podAllocationSize))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations, times(podAllocationSize + 1)).create(any(classOf[Pod]))
+    verify(podResource, times(podAllocationSize + 1)).create()
   }
 
   test("When a current batch reaches error states immediately, re-request" +
@@ -248,14 +270,14 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(failedPod)
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForId(podAllocationSize + 1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(podAllocationSize + 1))
   }
 
   test("Verify stopping deletes the labeled pods") {
-    when(podOperations
+    when(podsWithNamespace
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
       .thenReturn(labeledPods)
     podsAllocatorUnderTest.stop(TEST_SPARK_APP_ID)
@@ -264,39 +286,39 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
   test("When an executor is requested but the API does not report it in a reasonable time, retry" +
     " requesting that executor.") {
-    when(podOperations
+    when(podsWithNamespace
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
       .thenReturn(labeledPods)
     podsAllocatorUnderTest.setTotalExpectedExecutors(
       Map(defaultProfile -> 1))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForId(1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
     waitForExecutorPodsClock.setTime(podCreationTimeout + 1)
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
     verify(labeledPods).delete()
-    verify(podOperations).create(podWithAttachedContainerForId(2))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
   }
 
   test("SPARK-28487: scale up and down on target executor count changes") {
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
@@ -305,31 +327,31 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.setTotalExpectedExecutors(
       Map(defaultProfile -> 1))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForId(1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
 
     // Mark executor as running, verify that subsequent allocation cycle is a no-op.
     snapshotsStore.updatePod(runningExecutor(1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(1)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(1)).create()
+    verify(labeledPods, never()).delete()
 
     // Request 3 more executors, make sure all are requested.
     podsAllocatorUnderTest.setTotalExpectedExecutors(
       Map(defaultProfile -> 4))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations).create(podWithAttachedContainerForId(2))
-    verify(podOperations).create(podWithAttachedContainerForId(3))
-    verify(podOperations).create(podWithAttachedContainerForId(4))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
 
     // Mark 2 as running, 3 as pending. Allocation cycle should do nothing.
     snapshotsStore.updatePod(runningExecutor(2))
     snapshotsStore.updatePod(pendingExecutor(3))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
-    verify(podOperations, times(4)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(4)).create()
+    verify(labeledPods, never()).delete()
 
     // Scale down to 1. Pending executors (both acknowledged and not) should be deleted.
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
@@ -337,9 +359,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
       Map(defaultProfile -> 1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(4)).create(any())
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
-    verify(podOperations).delete()
+    verify(podResource, times(4)).create()
+    verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "3", "4")
+    verify(labeledPods).delete()
     assert(podsAllocatorUnderTest.isDeleted("3"))
     assert(podsAllocatorUnderTest.isDeleted("4"))
 
@@ -355,25 +377,25 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("SPARK-34334: correctly identify timed out pending pod requests as excess") {
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
-    verify(podOperations).create(podWithAttachedContainerForId(1))
-    verify(podOperations).create(any())
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+    verify(podResource).create()
 
     snapshotsStore.updatePod(pendingExecutor(1))
     snapshotsStore.notifySubscribers()
@@ -382,48 +404,48 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
     snapshotsStore.notifySubscribers()
-    verify(podOperations).create(podWithAttachedContainerForId(2))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     snapshotsStore.notifySubscribers()
 
-    verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
-    verify(podOperations, never()).delete()
+    verify(labeledPods, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(labeledPods, never()).delete()
 
     waitForExecutorPodsClock.advance(executorIdleTimeout)
     snapshotsStore.notifySubscribers()
 
     // before SPARK-34334 this verify() call failed as the non-timed out newly created request
     // decreased the number of requests taken from timed out pending pod requests
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
-    verify(podOperations).delete()
+    verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1")
+    verify(labeledPods).delete()
   }
 
   test("SPARK-33099: Respect executor idle timeout configuration") {
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 5))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
-    verify(podOperations).create(podWithAttachedContainerForId(1))
-    verify(podOperations).create(podWithAttachedContainerForId(2))
-    verify(podOperations).create(podWithAttachedContainerForId(3))
-    verify(podOperations).create(podWithAttachedContainerForId(4))
-    verify(podOperations).create(podWithAttachedContainerForId(5))
-    verify(podOperations, times(5)).create(any())
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5))
+    verify(podResource, times(5)).create()
 
     snapshotsStore.updatePod(pendingExecutor(1))
     snapshotsStore.updatePod(pendingExecutor(2))
@@ -433,7 +455,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     verify(podOperations, never()).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2", "3", "4", "5")
-    verify(podOperations, never()).delete()
+    verify(podResource, never()).delete()
 
     // Newly created executors (both acknowledged and not) are cleaned up.
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
@@ -444,8 +466,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     // though executor 1 is still in pending state and executor 3 and 4 are new request without
     // any state reported by kubernetes and all the three are already timed out
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5")
-    verify(podOperations).delete()
+    verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "5")
+    verify(labeledPods).delete()
   }
 
   /**
@@ -483,18 +505,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
    *     PODs: 8 and 9
    */
   test("SPARK-34361: scheduler backend known pods with multiple resource profiles at downscaling") {
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
@@ -510,20 +532,20 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     // 0) request 3 PODs for the default and 4 PODs for the other resource profile
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 4))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 7)
-    verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(2, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(3, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(4, rp.id))
-    verify(podOperations).create(podWithAttachedContainerForId(5, rp.id))
-    verify(podOperations).create(podWithAttachedContainerForId(6, rp.id))
-    verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id))
 
     // 1) make 1 POD known by the scheduler backend for each resource profile
     when(schedulerBackend.getExecutorIds).thenReturn(Seq("1", "4"))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5,
       "scheduler backend known PODs are not outstanding")
-    verify(podOperations, times(7)).create(any())
+    verify(podResource, times(7)).create()
 
     // 2) make 1 extra POD known by the scheduler backend for each resource profile
     // and make some to pending
@@ -534,15 +556,15 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(pendingExecutor(6, rp.id))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations, times(7)).create(any())
+    verify(podResource, times(7)).create()
 
     // 3) downscale to 1 POD for default and 1 POD for the other resource profile
     waitForExecutorPodsClock.advance(executorIdleTimeout * 2)
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(7)).create(any())
-    verify(podOperations, times(2)).delete()
+    verify(podResource, times(7)).create()
+    verify(labeledPods, times(2)).delete()
     assert(podsAllocatorUnderTest.isDeleted("3"))
     assert(podsAllocatorUnderTest.isDeleted("6"))
     assert(podsAllocatorUnderTest.isDeleted("7"))
@@ -551,32 +573,32 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     // 2 PODs known by the scheduler backend there must be no new POD requested to be created
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2, rp -> 2))
     snapshotsStore.notifySubscribers()
-    verify(podOperations, times(7)).create(any())
+    verify(podResource, times(7)).create()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(7)).create(any())
+    verify(podResource, times(7)).create()
 
     // 5) requesting 1 more executor for each resource
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 3, rp -> 3))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
-    verify(podOperations, times(9)).create(any())
-    verify(podOperations).create(podWithAttachedContainerForId(8, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(9, rp.id))
+    verify(podResource, times(9)).create()
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(8, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(9, rp.id))
   }
 
   test("SPARK-33288: multiple resource profiles") {
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
@@ -593,9 +615,9 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     // make sure it's requested, even with an empty initial snapshot.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 2))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations).create(podWithAttachedContainerForId(1, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(2, rp.id))
-    verify(podOperations).create(podWithAttachedContainerForId(3, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3, rp.id))
 
     // Mark executor as running, verify that subsequent allocation cycle is a no-op.
     snapshotsStore.updatePod(runningExecutor(1, defaultProfile.id))
@@ -603,18 +625,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(runningExecutor(3, rp.id))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(3)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(3)).create()
+    verify(podResource, never()).delete()
 
     // Request 3 more executors for default profile and 1 more for other profile,
     // make sure all are requested.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 4, rp -> 3))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 4)
-    verify(podOperations).create(podWithAttachedContainerForId(4, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(5, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(6, defaultProfile.id))
-    verify(podOperations).create(podWithAttachedContainerForId(7, rp.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(6, defaultProfile.id))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(7, rp.id))
 
     // Mark 4 as running, 5 and 7 as pending. Allocation cycle should do nothing.
     snapshotsStore.updatePod(runningExecutor(4, defaultProfile.id))
@@ -622,8 +644,8 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.updatePod(pendingExecutor(7, rp.id))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 3)
-    verify(podOperations, times(7)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(7)).create()
+    verify(podResource, never()).delete()
 
     // Scale down to 1 for both resource profiles. Pending executors
     // (both acknowledged and not) should be deleted.
@@ -631,10 +653,10 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1, rp -> 1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(7)).create(any())
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
-    verify(podOperations).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
-    verify(podOperations, times(2)).delete()
+    verify(podResource, times(7)).create()
+    verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "5", "6")
+    verify(labeledPods).withLabelIn(SPARK_EXECUTOR_ID_LABEL, "7")
+    verify(labeledPods, times(2)).delete()
     assert(podsAllocatorUnderTest.isDeleted("5"))
     assert(podsAllocatorUnderTest.isDeleted("6"))
     assert(podsAllocatorUnderTest.isDeleted("7"))
@@ -653,27 +675,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   }
 
   test("SPARK-33262: pod allocator does not stall with pending pods") {
-    when(podOperations
+    when(podsWithNamespace
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1"))
       .thenReturn(labeledPods)
-    when(podOperations
+    when(labeledPods
       .withLabelIn(SPARK_EXECUTOR_ID_LABEL, "2", "3", "4", "5", "6"))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 6))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 5)
     // Initial request of pods
-    verify(podOperations).create(podWithAttachedContainerForId(1))
-    verify(podOperations).create(podWithAttachedContainerForId(2))
-    verify(podOperations).create(podWithAttachedContainerForId(3))
-    verify(podOperations).create(podWithAttachedContainerForId(4))
-    verify(podOperations).create(podWithAttachedContainerForId(5))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(2))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(3))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(4))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(5))
     // 4 come up, 1 pending
     snapshotsStore.updatePod(pendingExecutor(1))
     snapshotsStore.updatePod(runningExecutor(2))
@@ -685,7 +707,7 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 2)
     // We request pod 6
-    verify(podOperations).create(podWithAttachedContainerForId(6))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForId(6))
   }
 
   test("SPARK-35416: Support PersistentVolumeClaim Reuse") {
@@ -715,18 +737,18 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
       kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
     podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)
 
-    when(podOperations
+    when(podsWithNamespace
       .withField("status.phase", "Pending"))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
-      .thenReturn(podOperations)
-    when(podOperations
+      .thenReturn(labeledPods)
+    when(labeledPods
       .withLabelIn(meq(SPARK_EXECUTOR_ID_LABEL), any()))
-      .thenReturn(podOperations)
+      .thenReturn(labeledPods)
 
     val startTime = Instant.now.toEpochMilli
     waitForExecutorPodsClock.setTime(startTime)
@@ -734,28 +756,27 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     // Target 1 executor, make sure it's requested, even with an empty initial snapshot.
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForIdAndVolume(1))
+    verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(1))
 
     // Mark executor as running, verify that subsequent allocation cycle is a no-op.
     snapshotsStore.updatePod(runningExecutor(1))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
-    verify(podOperations, times(1)).create(any())
-    verify(podOperations, never()).delete()
+    verify(podResource, times(1)).create()
+    verify(podResource, never()).delete()
 
     // Request a new executor, make sure it's using reused PVC
     podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 2))
     snapshotsStore.notifySubscribers()
     assert(podsAllocatorUnderTest.numOutstandingPods.get() == 1)
-    verify(podOperations).create(podWithAttachedContainerForIdAndVolume(2))
-    verify(persistentVolumeClaims, never()).create(any())
+    verify(podsWithNamespace).resource(podWithAttachedContainerForIdAndVolume(2))
+    verify(pvcWithNamespace, never()).resource(any())
   }
 
   test("print the pod name instead of Some(name) if pod is absent") {
     val nonexistentPod = "i-do-not-exist"
     val conf = new SparkConf().set(KUBERNETES_DRIVER_POD_NAME, nonexistentPod)
-    when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withName(nonexistentPod)).thenReturn(driverPodOperations)
+    when(podsWithNamespace.withName(nonexistentPod)).thenReturn(driverPodOperations)
     when(driverPodOperations.get()).thenReturn(null)
     val e = intercept[SparkException](new ExecutorPodsAllocator(
       conf, secMgr, executorBuilder, kubernetesClient, snapshotsStore, waitForExecutorPodsClock))
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
index e3ec53adef6..92d692c829a 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManagerSuite.scala
@@ -16,11 +16,14 @@
  */
 package org.apache.spark.scheduler.cluster.k8s
 
+import java.util.function.UnaryOperator
+
 import io.fabric8.kubernetes.api.model.Pod
 import io.fabric8.kubernetes.client.KubernetesClient
 import io.fabric8.kubernetes.client.dsl.PodResource
-import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
+import org.mockito.{Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.any
+import org.mockito.ArgumentMatchers.anyString
 import org.mockito.Mockito.{mock, never, times, verify, when}
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
@@ -37,7 +40,7 @@ import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._
 
 class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfter {
 
-  private var namedExecutorPods: mutable.Map[String, PodResource[Pod]] = _
+  private var namedExecutorPods: mutable.Map[String, PodResource] = _
 
   @Mock
   private var kubernetesClient: KubernetesClient = _
@@ -45,6 +48,9 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
   @Mock
   private var podOperations: PODS = _
 
+  @Mock
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
   @Mock
   private var schedulerBackend: KubernetesClusterSchedulerBackend = _
 
@@ -54,10 +60,11 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
   before {
     MockitoAnnotations.openMocks(this).close()
     snapshotsStore = new DeterministicExecutorPodsSnapshotsStore()
-    namedExecutorPods = mutable.Map.empty[String, PodResource[Pod]]
+    namedExecutorPods = mutable.Map.empty[String, PodResource]
     when(schedulerBackend.getExecutorsWithRegistrationTs()).thenReturn(Map.empty[String, Long])
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
+    when(podOperations.inNamespace(anyString())).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withName(any(classOf[String]))).thenAnswer(namedPodsAnswer())
     eventHandlerUnderTest = new ExecutorPodsLifecycleManager(
       new SparkConf(),
       kubernetesClient,
@@ -109,6 +116,12 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
   }
 
+  test("SPARK-40458: test executor inactivation function") {
+    val failedPod = failedExecutorWithoutDeletion(1)
+    val inactivated = ExecutorPodsLifecycleManager.executorInactivationFn(failedPod)
+    assert(inactivated.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
+  }
+
   test("Keep executor pods in k8s if configured.") {
     val failedPod = failedExecutorWithoutDeletion(1)
     eventHandlerUnderTest.conf.set(Config.KUBERNETES_DELETE_EXECUTORS, false)
@@ -118,12 +131,8 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
     val expectedLossReason = ExecutorExited(1, exitCausedByApp = true, msg)
     verify(schedulerBackend).doRemoveExecutor("1", expectedLossReason)
     verify(namedExecutorPods(failedPod.getMetadata.getName), never()).delete()
-
-    val podCaptor = ArgumentCaptor.forClass(classOf[Pod])
-    verify(namedExecutorPods(failedPod.getMetadata.getName)).patch(podCaptor.capture())
-
-    val pod = podCaptor.getValue()
-    assert(pod.getMetadata().getLabels().get(SPARK_EXECUTOR_INACTIVE_LABEL) === "true")
+    verify(namedExecutorPods(failedPod.getMetadata.getName))
+      .edit(any[UnaryOperator[Pod]]())
   }
 
   private def exitReasonMessage(execId: Int, failedPod: Pod, exitCode: Int): String = {
@@ -146,10 +155,10 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
       """.stripMargin
   }
 
-  private def namedPodsAnswer(): Answer[PodResource[Pod]] =
+  private def namedPodsAnswer(): Answer[PodResource] =
     (invocation: InvocationOnMock) => {
       val podName: String = invocation.getArgument(0)
       namedExecutorPods.getOrElseUpdate(
-        podName, mock(classOf[PodResource[Pod]]))
+        podName, mock(classOf[PodResource]))
     }
 }
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
index 8209bee7a02..61080268cde 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSourceSuite.scala
@@ -41,6 +41,9 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
   @Mock
   private var podOperations: PODS = _
 
+  @Mock
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
   @Mock
   private var appIdLabeledPods: LABELED_PODS = _
 
@@ -58,7 +61,8 @@ class ExecutorPodsWatchSnapshotSourceSuite extends SparkFunSuite with BeforeAndA
     MockitoAnnotations.openMocks(this).close()
     watch = ArgumentCaptor.forClass(classOf[Watcher[Pod]])
     when(kubernetesClient.pods()).thenReturn(podOperations)
-    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+    when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
+    when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
       .thenReturn(appIdLabeledPods)
     when(appIdLabeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
       .thenReturn(executorRoleLabeledPods)
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
index c3af83118f1..bb5e93c92ac 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackendSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.scheduler.cluster.k8s
 import java.util.Arrays
 import java.util.concurrent.TimeUnit
 
-import io.fabric8.kubernetes.api.model.{ObjectMeta, Pod, PodList}
+import io.fabric8.kubernetes.api.model.{ConfigMap, Pod, PodList}
 import io.fabric8.kubernetes.client.KubernetesClient
-import io.fabric8.kubernetes.client.dsl.{NonNamespaceOperation, PodResource}
+import io.fabric8.kubernetes.client.dsl.PodResource
 import org.jmock.lib.concurrent.DeterministicScheduler
 import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
 import org.mockito.ArgumentMatchers.{any, eq => mockitoEq}
@@ -66,12 +66,21 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   @Mock
   private var podOperations: PODS = _
 
+  @Mock
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
   @Mock
   private var labeledPods: LABELED_PODS = _
 
   @Mock
   private var configMapsOperations: CONFIG_MAPS = _
 
+  @Mock
+  private var configMapsWithNamespace: CONFIG_MAPS_WITH_NAMESPACE = _
+
+  @Mock
+  private var configMapResource: CONFIG_MAPS_RESOURCE = _
+
   @Mock
   private var labeledConfigMaps: LABELED_CONFIG_MAPS = _
 
@@ -117,7 +126,10 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
         driverEndpoint.capture()))
       .thenReturn(driverEndpointRef)
     when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
     when(kubernetesClient.configMaps()).thenReturn(configMapsOperations)
+    when(configMapsOperations.inNamespace("default")).thenReturn(configMapsWithNamespace)
+    when(configMapsWithNamespace.resource(any[ConfigMap]())).thenReturn(configMapResource)
     when(podAllocator.driverPod).thenReturn(None)
     schedulerBackendUnderTest = new KubernetesClusterSchedulerBackend(
       taskScheduler,
@@ -142,13 +154,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     verify(lifecycleEventHandler).start(schedulerBackendUnderTest)
     verify(watchEvents).start(TEST_SPARK_APP_ID)
     verify(pollEvents).start(TEST_SPARK_APP_ID)
-    verify(configMapsOperations).create(any())
+    verify(configMapResource).create()
   }
 
   test("Stop all components") {
-    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
-    when(configMapsOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
+    when(configMapsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID))
       .thenReturn(labeledConfigMaps)
     when(labeledConfigMaps.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE))
       .thenReturn(labeledConfigMaps)
@@ -177,36 +189,14 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
   test("Kill executors") {
     schedulerBackendUnderTest.start()
 
-    val operation = mock(classOf[NonNamespaceOperation[
-      Pod, PodList, PodResource[Pod]]])
-
-    when(podOperations.inNamespace(any())).thenReturn(operation)
-    when(podOperations.withField(any(), any())).thenReturn(labeledPods)
-    when(podOperations.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
+    when(podsWithNamespace.withField(any(), any())).thenReturn(labeledPods)
+    when(podsWithNamespace.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_APP_ID_LABEL, TEST_SPARK_APP_ID)).thenReturn(labeledPods)
     when(labeledPods.withLabel(SPARK_ROLE_LABEL, SPARK_POD_EXECUTOR_ROLE)).thenReturn(labeledPods)
     when(labeledPods.withLabelIn(SPARK_EXECUTOR_ID_LABEL, "1", "2")).thenReturn(labeledPods)
-
-    val pod1 = mock(classOf[Pod])
-    val pod1Metadata = mock(classOf[ObjectMeta])
-    when(pod1Metadata.getNamespace).thenReturn("coffeeIsLife")
-    when(pod1Metadata.getName).thenReturn("pod1")
-    when(pod1.getMetadata).thenReturn(pod1Metadata)
-
-    val pod2 = mock(classOf[Pod])
-    val pod2Metadata = mock(classOf[ObjectMeta])
-    when(pod2Metadata.getNamespace).thenReturn("coffeeIsLife")
-    when(pod2Metadata.getName).thenReturn("pod2")
-    when(pod2.getMetadata).thenReturn(pod2Metadata)
-
-    val pod1op = mock(classOf[PodResource[Pod]])
-    val pod2op = mock(classOf[PodResource[Pod]])
-    when(operation.withName("pod1")).thenReturn(pod1op)
-    when(operation.withName("pod2")).thenReturn(pod2op)
-
-    val podList = mock(classOf[PodList])
-    when(labeledPods.list()).thenReturn(podList)
-    when(podList.getItems()).thenReturn(Arrays.asList[Pod]())
+    val pod1op = mock(classOf[PodResource])
+    val pod2op = mock(classOf[PodResource])
+    when(labeledPods.resources()).thenReturn(Arrays.asList[PodResource]().stream)
     schedulerExecutorService.tick(sparkConf.get(KUBERNETES_DYN_ALLOC_KILL_GRACE_PERIOD) * 2,
       TimeUnit.MILLISECONDS)
     verify(labeledPods, never()).delete()
@@ -227,7 +217,13 @@ class KubernetesClusterSchedulerBackendSuite extends SparkFunSuite with BeforeAn
     verify(pod2op, never()).edit(any(
       classOf[java.util.function.UnaryOperator[io.fabric8.kubernetes.api.model.Pod]]))
 
-    when(podList.getItems()).thenReturn(Arrays.asList(pod1))
+    when(labeledPods.resources()).thenReturn(Arrays.asList(pod1op).stream)
+    val podList = mock(classOf[PodList])
+    when(labeledPods.list()).thenReturn(podList)
+    val pod1 = mock(classOf[Pod])
+    val pod2 = mock(classOf[Pod])
+    when(podList.getItems).thenReturn(Arrays.asList(pod1, pod2))
+
     schedulerBackendUnderTest.doKillExecutors(Seq("1", "2"))
     verify(labeledPods, never()).delete()
     schedulerExecutorService.runUntilIdle()
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
index 748f509e013..f74d2c9feee 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/StatefulSetAllocatorSuite.scala
@@ -67,18 +67,25 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   private var appOperations: AppsAPIGroupDSL = _
 
   @Mock
-  private var statefulSetOperations: MixedOperation[
-    apps.StatefulSet, apps.StatefulSetList, RollableScalableResource[apps.StatefulSet]] = _
+  private var statefulSetOperations: STATEFUL_SETS = _
 
   @Mock
-  private var editableSet: RollableScalableResource[apps.StatefulSet] = _
+  private var statefulSetNamespaced: STATEFUL_SETS_NAMESPACED = _
+
+  @Mock
+  private var editableSet: STATEFUL_SET_RES = _
 
   @Mock
   private var podOperations: PODS = _
 
+  @Mock
+  private var podsWithNamespace: PODS_WITH_NAMESPACE = _
+
+  @Mock
+  private var podResource: PodResource = _
 
   @Mock
-  private var driverPodOperations: PodResource[Pod] = _
+  private var driverPodOperations: PodResource = _
 
   private var podsAllocatorUnderTest: StatefulSetPodsAllocator = _
 
@@ -102,10 +109,14 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
   before {
     MockitoAnnotations.openMocks(this).close()
     when(kubernetesClient.pods()).thenReturn(podOperations)
+    when(podOperations.inNamespace("default")).thenReturn(podsWithNamespace)
     when(kubernetesClient.apps()).thenReturn(appOperations)
     when(appOperations.statefulSets()).thenReturn(statefulSetOperations)
-    when(statefulSetOperations.withName(any())).thenReturn(editableSet)
-    when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(statefulSetOperations.inNamespace("default")).thenReturn(statefulSetNamespaced)
+    when(statefulSetNamespaced.resource(any())).thenReturn(editableSet)
+    when(statefulSetNamespaced.withName(any())).thenReturn(editableSet)
+    when(podsWithNamespace.withName(driverPodName)).thenReturn(driverPodOperations)
+    when(podsWithNamespace.resource(any())).thenReturn(podResource)
     when(driverPodOperations.get).thenReturn(driverPod)
     when(driverPodOperations.waitUntilReady(any(), any())).thenReturn(driverPod)
     when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
@@ -128,7 +139,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
       Map(defaultProfile -> (10),
           immrprof -> (420)))
     val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
-    verify(statefulSetOperations, times(2)).create(any())
+    verify(statefulSetNamespaced, times(2)).resource(any())
+    verify(editableSet, times(2)).create()
     podsAllocatorUnderTest.stop(appId)
     verify(editableSet, times(2)).delete()
   }
@@ -137,7 +149,8 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     podsAllocatorUnderTest.setTotalExpectedExecutors(
       Map(defaultProfile -> (10)))
     val captor = ArgumentCaptor.forClass(classOf[StatefulSet])
-    verify(statefulSetOperations, times(1)).create(captor.capture())
+    verify(statefulSetNamespaced, times(1)).resource(captor.capture())
+    verify(editableSet, times(1)).create()
     val set = captor.getValue()
     val setName = set.getMetadata().getName()
     val namespace = set.getMetadata().getNamespace()
@@ -145,7 +158,7 @@ class StatefulSetAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
     val spec = set.getSpec()
     assert(spec.getReplicas() === 10)
     assert(spec.getPodManagementPolicy() === "Parallel")
-    verify(podOperations, never()).create(any())
+    verify(podResource, never()).create()
     podsAllocatorUnderTest.setTotalExpectedExecutors(
       Map(defaultProfile -> (20)))
     verify(editableSet, times(1)).scale(any(), any())
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
index 1a9724afe30..93200ea1297 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/ClientModeTestsSuite.scala
@@ -122,7 +122,8 @@ private[spark] trait ClientModeTestsSuite { k8sSuite: KubernetesSuite =>
         .kubernetesClient
         .services()
         .inNamespace(kubernetesTestComponents.namespace)
-        .delete(driverService)
+        .resource(driverService)
+        .delete()
       // Delete all executors, since the test explicitly asks them not to be deleted by the app.
       kubernetesTestComponents
         .kubernetesClient
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
index 6a3dfd5bf79..efda4143181 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SparkReadinessWatcher.scala
@@ -22,7 +22,7 @@ import com.google.common.util.concurrent.SettableFuture
 import io.fabric8.kubernetes.api.model.HasMetadata
 import io.fabric8.kubernetes.client.{Watcher, WatcherException}
 import io.fabric8.kubernetes.client.Watcher.Action
-import io.fabric8.kubernetes.client.internal.readiness.Readiness
+import io.fabric8.kubernetes.client.readiness.Readiness
 
 private[spark] class SparkReadinessWatcher[T <: HasMetadata] extends Watcher[T] {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org