You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ni...@apache.org on 2022/07/24 03:21:08 UTC

[incubator-heron] branch master updated: [3846] Refactoring of the Kubernetes shim (#3847)

This is an automated email from the ASF dual-hosted git repository.

nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/master by this push:
     new 41e098862f5 [3846] Refactoring of the Kubernetes shim (#3847)
41e098862f5 is described below

commit 41e098862f5d10a39738fe5e1139649d1c47c151
Author: Saad Ur Rahman <su...@users.noreply.github.com>
AuthorDate: Sat Jul 23 23:21:01 2022 -0400

    [3846] Refactoring of the Kubernetes shim (#3847)
---
 .../scheduler/kubernetes/KubernetesContext.java    |   2 +-
 .../scheduler/kubernetes/KubernetesScheduler.java  |   2 +-
 .../heron/scheduler/kubernetes/KubernetesShim.java | 609 ++++++++++++++++++++
 .../scheduler/kubernetes/KubernetesUtils.java      |   4 +-
 .../{V1Controller.java => StatefulSet.java}        | 628 ++++-----------------
 heron/schedulers/tests/java/BUILD                  |   3 +-
 .../scheduler/kubernetes/KubernetesShimTest.java   | 419 ++++++++++++++
 .../scheduler/kubernetes/KubernetesUtilsTest.java  |  22 +-
 ...{V1ControllerTest.java => StatefulSetTest.java} | 515 ++++-------------
 .../heron/scheduler/kubernetes/VolumesTests.java   |   2 +-
 10 files changed, 1272 insertions(+), 934 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 8d9a8795070..3d424ce5a89 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -219,7 +219,7 @@ public final class KubernetesContext extends Context {
   @VisibleForTesting
   protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
       getVolumeConfigs(final Config config, final String prefix, final boolean isExecutor) {
-    final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+    final Logger LOG = Logger.getLogger(KubernetesShim.class.getName());
 
     final String prefixKey = String.format(prefix,
         isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
index c35c87df7d5..abc35c77397 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -50,7 +50,7 @@ public class KubernetesScheduler implements IScheduler, IScalable {
   private UpdateTopologyManager updateTopologyManager;
 
   protected KubernetesController getController() {
-    return new V1Controller(configuration, runtimeConfiguration);
+    return new KubernetesShim(configuration, runtimeConfiguration);
   }
 
   @Override
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
new file mode 100644
index 00000000000..f3541569986
--- /dev/null
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -0,0 +1,609 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologyRuntimeManagementException;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodTemplate;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.PatchUtils;
+import io.kubernetes.client.util.Yaml;
+import okhttp3.Response;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
+public class KubernetesShim extends KubernetesController {
+
+  private static final Logger LOG =
+      Logger.getLogger(KubernetesShim.class.getName());
+
+  private final boolean isPodTemplateDisabled;
+
+  private final AppsV1Api appsClient;
+  private final CoreV1Api coreClient;
+
+  /**
+   * Configures the Kubernetes API Application and Core communications clients.
+   * @param configuration <code>topology</code> configurations.
+   * @param runtimeConfiguration Kubernetes runtime configurations.
+   */
+  KubernetesShim(Config configuration, Config runtimeConfiguration) {
+    super(configuration, runtimeConfiguration);
+
+    isPodTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
+    LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
+        isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+    LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
+        KubernetesContext.getVolumesFromCLIDisabled(configuration) ? "DISABLED" : "ENABLED"));
+
+    try {
+      final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient();
+      Configuration.setDefaultApiClient(apiClient);
+      appsClient = new AppsV1Api(apiClient);
+      coreClient = new CoreV1Api(apiClient);
+    } catch (IOException e) {
+      LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e);
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Configures all components required by a <code>topology</code> and submits it to the Kubernetes scheduler.
+   * @param packingPlan Used to configure the StatefulSets <code>Resource</code>s and replica count.
+   * @return Success indicator.
+   */
+  @Override
+  boolean submit(PackingPlan packingPlan) {
+    final String topologyName = getTopologyName();
+    if (!topologyName.equals(topologyName.toLowerCase())) {
+      throw new TopologySubmissionException("K8S scheduler does not allow upper case topology's.");
+    }
+
+    final Resource containerResource = getContainerResource(packingPlan);
+
+    final V1Service topologyService = createTopologyService();
+    try {
+      coreClient.createNamespacedService(getNamespace(), topologyService, null,
+              null, null);
+    } catch (ApiException e) {
+      KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
+      throw new TopologySubmissionException(e.getMessage());
+    }
+
+    // Find the max number of instances in a container so that we can open
+    // enough ports if remote debugging is enabled.
+    int numberOfInstances = 0;
+    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+      numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
+    }
+
+    final StatefulSet.Configs clusterConfigs = new StatefulSet.Configs(
+        getConfiguration(),
+        getRuntimeConfiguration(),
+        loadPodFromTemplate(false),
+        loadPodFromTemplate(true)
+        );
+
+    final V1StatefulSet executors = StatefulSet.get()
+        .create(StatefulSet.Type.Executor, clusterConfigs, containerResource, numberOfInstances);
+    final V1StatefulSet manager = StatefulSet.get()
+        .create(StatefulSet.Type.Manager, clusterConfigs, containerResource, numberOfInstances);
+
+    try {
+      appsClient.createNamespacedStatefulSet(getNamespace(), executors, null,
+              null, null);
+      appsClient.createNamespacedStatefulSet(getNamespace(), manager, null,
+              null, null);
+    } catch (ApiException e) {
+      final String message = String.format("Error creating topology: %s%n", e.getResponseBody());
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
+    }
+
+    return true;
+  }
+
+  /**
+   * Shuts down a <code>topology</code> by deleting all associated resources.
+   * <ul>
+   *   <li><code>Persistent Volume Claims</code> added by the <code>topology</code>.</li>
+   *   <li><code>StatefulSet</code> for both the <code>Executors</code> and <code>Manager</code>.</li>
+   *   <li>Headless <code>Service</code> which facilitates communication between all Pods.</li>
+   * </ul>
+   * @return Success indicator.
+   */
+  @Override
+  boolean killTopology() {
+    removePersistentVolumeClaims();
+    deleteStatefulSets();
+    deleteService();
+    return true;
+  }
+
+  /**
+   * Restarts a topology by deleting the Pods associated with it using <code>Selector Labels</code>.
+   * @param shardId Not used but required because of interface.
+   * @return Indicator of successful submission of restart request to Kubernetes cluster.
+   */
+  @Override
+  boolean restart(int shardId) {
+    try {
+      coreClient.deleteCollectionNamespacedPod(getNamespace(), null, null, null, null, 0,
+          createTopologySelectorLabels(), null, null, null, null, null, null, null);
+      LOG.log(Level.WARNING, String.format("Restarting topology '%s'...", getTopologyName()));
+    } catch (ApiException e) {
+      LOG.log(Level.SEVERE, String.format("Failed to restart topology '%s'...", getTopologyName()));
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Adds a specified number of Pods to a <code>topology</code>'s <code>Executors</code>.
+   * @param containersToAdd Set of containers to be added.
+   * @return The passed in <code>Packing Plan</code>.
+   */
+  @Override
+  public Set<PackingPlan.ContainerPlan>
+      addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+    final V1StatefulSet statefulSet;
+    try {
+      statefulSet = getStatefulSet();
+    } catch (ApiException ae) {
+      final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
+      throw new TopologyRuntimeManagementException(message, ae);
+    }
+    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
+    final int newContainerCount = currentContainerCount + containersToAdd.size();
+
+    try {
+      patchStatefulSetReplicas(newContainerCount);
+    } catch (ApiException ae) {
+      throw new TopologyRuntimeManagementException(
+          ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
+    }
+
+    return containersToAdd;
+  }
+
+  /**
+   * Removes a specified number of Pods from a <code>topology</code>'s <code>Executors</code>.
+   * @param containersToRemove Set of containers to be removed.
+   */
+  @Override
+  public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
+    final V1StatefulSet statefulSet;
+    try {
+      statefulSet = getStatefulSet();
+    } catch (ApiException ae) {
+      final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
+      throw new TopologyRuntimeManagementException(message, ae);
+    }
+
+    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
+    final int newContainerCount = currentContainerCount - containersToRemove.size();
+
+    try {
+      patchStatefulSetReplicas(newContainerCount);
+    } catch (ApiException e) {
+      throw new TopologyRuntimeManagementException(
+          e.getMessage() + "\ndetails\n" + e.getResponseBody());
+    }
+  }
+
+  /**
+   * Performs an in-place update of the replica count for a <code>topology</code>. This allows the
+   * <code>topology</code> Pod count to be scaled up or down.
+   * @param replicas The new number of Pod replicas required.
+   * @throws ApiException in the event there is a failure patching the StatefulSet.
+   */
+  private void patchStatefulSetReplicas(int replicas) throws ApiException {
+    final String body =
+            String.format(KubernetesConstants.JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
+                    replicas);
+    final V1Patch patch = new V1Patch(body);
+
+    PatchUtils.patch(V1StatefulSet.class,
+        () ->
+          appsClient.patchNamespacedStatefulSetCall(
+            getStatefulSetName(true),
+            getNamespace(),
+            patch,
+            null,
+            null,
+            null,
+            null,
+            null),
+        V1Patch.PATCH_FORMAT_JSON_PATCH,
+        appsClient.getApiClient());
+  }
+
+  /**
+   * Retrieves the <code>Executors</code> StatefulSet configurations for the Kubernetes cluster.
+   * @return <code>Executors</code> StatefulSet configurations.
+   * @throws ApiException in the event there is a failure retrieving the StatefulSet.
+   */
+  V1StatefulSet getStatefulSet() throws ApiException {
+    return appsClient.readNamespacedStatefulSet(getStatefulSetName(true), getNamespace(),
+        null);
+  }
+
+  /**
+   * Deletes the headless <code>Service</code> for a <code>topology</code>'s <code>Executors</code>
+   * and <code>Manager</code> using the <code>topology</code>'s name.
+   */
+  void deleteService() {
+    try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
+          getNamespace(), null, null, 0, null,
+          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
+
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
+                  + getTopologyName());
+          return;
+        }
+        LOG.log(Level.SEVERE,
+            String.format("Error when deleting the Service of the job [%s] in namespace [%s]",
+            getTopologyName(), getNamespace()));
+        LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
+        KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+
+        throw new TopologyRuntimeManagementException(
+                KubernetesUtils.errorMessageFromResponse(response));
+      }
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
+    }
+    LOG.log(Level.INFO,
+        String.format("Headless Service for the Job [%s] in namespace [%s] is deleted.",
+        getTopologyName(), getNamespace()));
+  }
+
+  /**
+   * Deletes the StatefulSets for a <code>topology</code>'s <code>Executors</code> and <code>Manager</code>
+   * using <code>Label</code>s.
+   */
+  void deleteStatefulSets() {
+    try (Response response = appsClient.deleteCollectionNamespacedStatefulSetCall(getNamespace(),
+        null, null, null, null, null, createTopologySelectorLabels(), null, null, null, null, null,
+            null, null, null)
+        .execute()) {
+
+      if (!response.isSuccessful()) {
+        if (response.code() == HTTP_NOT_FOUND) {
+          LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSets for Topology: "
+                  + getTopologyName());
+          return;
+        }
+        LOG.log(Level.SEVERE,
+            String.format("Error when deleting the StatefulSets of the job [%s] in namespace [%s]",
+            getTopologyName(), getNamespace()));
+        LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
+        KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+
+        throw new TopologyRuntimeManagementException(
+                KubernetesUtils.errorMessageFromResponse(response));
+      }
+    } catch (ApiException e) {
+      if (e.getCode() == HTTP_NOT_FOUND) {
+        LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+                + getTopologyName());
+        return;
+      }
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+        e);
+    } catch (IOException e) {
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+        e);
+    }
+    LOG.log(Level.INFO,
+        String.format("StatefulSet for the Job [%s] in namespace [%s] is deleted.",
+          getTopologyName(), getNamespace()));
+  }
+
+  /**
+   * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
+   * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
+   */
+  private V1Service createTopologyService() {
+    final String topologyName = getTopologyName();
+
+    final V1Service service = new V1Service();
+
+    // Setup service metadata.
+    final V1ObjectMeta objectMeta = new V1ObjectMeta()
+        .name(topologyName)
+        .annotations(getServiceAnnotations())
+        .labels(getServiceLabels());
+    service.setMetadata(objectMeta);
+
+    // Create the headless service.
+    final V1ServiceSpec serviceSpec = new V1ServiceSpec()
+        .clusterIP("None")
+        .selector(getPodMatchLabels(topologyName));
+
+    service.setSpec(serviceSpec);
+
+    return service;
+  }
+
+  /**
+   * Extracts <code>Service Annotations</code> for configurations.
+   * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
+   */
+  private Map<String, String> getServiceAnnotations() {
+    return KubernetesContext.getServiceAnnotations(getConfiguration());
+  }
+
+  /**
+   * Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s.
+   * @param topologyName Name of the <code>topology</code>.
+   * @return Key-value pairs of <code>Match Label</code>s to be added to the Pod.
+   */
+  private Map<String, String> getPodMatchLabels(String topologyName) {
+    final Map<String, String> labels = new HashMap<>();
+    labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
+    labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+    return labels;
+  }
+
+  /**
+   * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
+   * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
+   */
+  private Map<String, String> getServiceLabels() {
+    return KubernetesContext.getServiceLabels(getConfiguration());
+  }
+
+  /**
+   * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
+   * The loaded text is then parsed into a usable <code>Pod Template</code>.
+   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+   *                   or <code>Manager</code>.
+   * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
+   */
+  @VisibleForTesting
+  protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
+    final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
+
+    // Default Pod Template.
+    if (podTemplateConfigMapName == null) {
+      LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
+      return new V1PodTemplateSpec();
+    }
+
+    if (isPodTemplateDisabled) {
+      throw new TopologySubmissionException("Custom Pod Templates are disabled");
+    }
+
+    final String configMapName = podTemplateConfigMapName.first;
+    final String podTemplateName = podTemplateConfigMapName.second;
+
+    // Attempt to locate ConfigMap with provided Pod Template name.
+    try {
+      V1ConfigMap configMap = getConfigMap(configMapName);
+      if (configMap == null) {
+        throw new ApiException(
+            String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
+      }
+
+      final Map<String, String> configMapData = configMap.getData();
+      if (configMapData != null && configMapData.containsKey(podTemplateName)) {
+        // NullPointerException when Pod Template is empty.
+        V1PodTemplateSpec podTemplate = ((V1PodTemplate)
+            Yaml.load(configMapData.get(podTemplateName))).getTemplate();
+        LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
+            configMapName, podTemplateName));
+        return podTemplate;
+      }
+
+      // Failure to locate Pod Template with provided name.
+      throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
+          podTemplateName, configMapName));
+    } catch (ApiException e) {
+      KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
+      throw new TopologySubmissionException(e.getMessage());
+    } catch (IOException | ClassCastException | NullPointerException e) {
+      final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
+          podTemplateName, configMapName);
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
+    }
+  }
+
+  /**
+   * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
+   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+   *                   or <code>Manager</code>.
+   * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
+   */
+  @VisibleForTesting
+  protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
+    final String podTemplateConfigMapName = KubernetesContext
+        .getPodTemplateConfigMapName(getConfiguration(), isExecutor);
+
+    if (podTemplateConfigMapName == null) {
+      return null;
+    }
+
+    try {
+      final int splitPoint = podTemplateConfigMapName.indexOf(".");
+      final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
+      final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
+
+      if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
+        throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
+      }
+
+      return new Pair<>(configMapName, podTemplateName);
+    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+      final String message = "Invalid ConfigMap and/or Pod Template name";
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(message);
+    }
+  }
+
+  /**
+   * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
+   * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
+   * @return The retrieved <code>ConfigMap</code>.
+   */
+  @VisibleForTesting
+  protected V1ConfigMap getConfigMap(String configMapName) {
+    try {
+      return coreClient.readNamespacedConfigMap(
+          configMapName,
+          getNamespace(),
+          null);
+    } catch (ApiException e) {
+      final String message = "Error retrieving ConfigMaps";
+      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+      throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
+    }
+  }
+
+  /**
+   * Removes all Persistent Volume Claims associated with a specific topology, if they exist.
+   * It looks for the following:
+   * metadata:
+   *   labels:
+   *     topology: <code>topology-name</code>
+   *     onDemand: <code>true</code>
+   */
+  private void removePersistentVolumeClaims() {
+    final String name = getTopologyName();
+    final StringBuilder selectorLabel = new StringBuilder();
+
+    // Generate selector label.
+    for (Map.Entry<String, String> label : getPersistentVolumeClaimLabels(name).entrySet()) {
+      if (selectorLabel.length() != 0) {
+        selectorLabel.append(",");
+      }
+      selectorLabel.append(label.getKey()).append("=").append(label.getValue());
+    }
+
+    // Remove all dynamically backed Persistent Volume Claims.
+    try {
+      V1Status status = coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
+          getNamespace(),
+          null,
+          null,
+          null,
+          null,
+          null,
+          selectorLabel.toString(),
+          null,
+          null,
+          null,
+          null,
+          null,
+          null,
+          null);
+
+      LOG.log(Level.INFO,
+          String.format("Removing automatically generated Persistent Volume Claims for `%s`:%n%s",
+          name, status.getMessage()));
+    } catch (ApiException e) {
+      final String message = String.format("Failed to connect to K8s cluster to delete Persistent "
+              + "Volume Claims for topology `%s`. A manual clean-up is required.%n%s",
+          name, e.getMessage());
+      LOG.log(Level.WARNING, message);
+      throw new TopologyRuntimeManagementException(message);
+    }
+  }
+
+  /**
+   * Generates the <code>Label</code> which are attached to a Topology's Persistent Volume Claims.
+   * @param topologyName Attached to the topology match label.
+   * @return A map consisting of the <code>label-value</code> pairs to be used in <code>Label</code>s.
+   */
+  @VisibleForTesting
+  protected static Map<String, String> getPersistentVolumeClaimLabels(String topologyName) {
+    return new HashMap<String, String>() {
+      {
+        put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+        put(KubernetesConstants.LABEL_ON_DEMAND, "true");
+      }
+    };
+  }
+
+  /**
+   * Generates the <code>StatefulSet</code> name depending on if it is a <code>Executor</code> or
+   * <code>Manager</code>.
+   * @param isExecutor Flag used to generate name for <code>Executor</code> or <code>Manager</code>.
+   * @return String <code>"topology-name"-executors</code>.
+   */
+  private String getStatefulSetName(boolean isExecutor) {
+    return String.format("%s-%s", getTopologyName(),
+        isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
+  }
+
+  /**
+   * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
+   * @return A label of the form <code>app=heron,topology=topology-name</code>.
+   */
+  private String createTopologySelectorLabels() {
+    return String.format("%s=%s,%s=%s",
+        KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
+        KubernetesConstants.LABEL_TOPOLOGY, getTopologyName());
+  }
+}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index 709662a9f30..83891727110 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -95,8 +95,8 @@ final class KubernetesUtils {
     return Math.round(value * scale) / scale;
   }
 
-  static class V1ControllerUtils<T> {
-    private static final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+  static class CommonUtils<T> {
+    private static final Logger LOG = Logger.getLogger(KubernetesShim.class.getName());
 
     /**
      * Merge two lists by keeping all values in the <code>primaryList</code> and de-duplicating values in
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
similarity index 60%
rename from heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
rename to heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 1cd85e5ad6f..b1f2fc990e2 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -19,7 +19,6 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -28,8 +27,6 @@ import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
 import java.util.stream.Collectors;
@@ -38,24 +35,15 @@ import java.util.stream.IntStream;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.utils.Runtime;
 import org.apache.heron.scheduler.utils.SchedulerUtils;
 import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
 
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.custom.V1Patch;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.Configuration;
-import io.kubernetes.client.openapi.apis.AppsV1Api;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -65,314 +53,135 @@ import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
 import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1SecretKeySelector;
 import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
-import io.kubernetes.client.openapi.models.V1Service;
-import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
-import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.util.PatchUtils;
-import io.kubernetes.client.util.Yaml;
-import okhttp3.Response;
 
-import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+final class StatefulSet {
+  private final Map<Type, IStatefulSetFactory> statefulsets = new HashMap<>();
 
-public class V1Controller extends KubernetesController {
-
-  private static final Logger LOG =
-      Logger.getLogger(V1Controller.class.getName());
+  public enum Type {
+    Executor,
+    Manager
+  }
 
-  private static final String ENV_SHARD_ID = "SHARD_ID";
+  private Configs clusterConfigs;
 
-  private final boolean isPodTemplateDisabled;
+  private static final Logger LOG = Logger.getLogger(StatefulSet.class.getName());
 
-  private final AppsV1Api appsClient;
-  private final CoreV1Api coreClient;
+  private static final String ENV_SHARD_ID = "SHARD_ID";
 
   /**
-   * Configures the Kubernetes API Application and Core communications clients.
-   * @param configuration <code>topology</code> configurations.
-   * @param runtimeConfiguration Kubernetes runtime configurations.
+   * Container class of all the Kubernetes cluster configurations. The methods contained within
+   * <code>KubernetesController</code> cannot be accessed externally since it is an abstract class.
    */
-  V1Controller(Config configuration, Config runtimeConfiguration) {
-    super(configuration, runtimeConfiguration);
-
-    isPodTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
-    LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
-        isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
-    LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
-        KubernetesContext.getVolumesFromCLIDisabled(configuration) ? "DISABLED" : "ENABLED"));
-
-    try {
-      final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient();
-      Configuration.setDefaultApiClient(apiClient);
-      appsClient = new AppsV1Api(apiClient);
-      coreClient = new CoreV1Api(apiClient);
-    } catch (IOException e) {
-      LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e);
-      throw new RuntimeException(e);
-    }
-  }
+  static final class Configs {
+    private final String topologyName;
+    private final Config configuration;
+    private final Config runtimeConfiguration;
+    private final V1PodTemplateSpec managerPodTemplateSpec;
+    private final V1PodTemplateSpec executorPodTemplateSpec;
 
-  /**
-   * Configures all components required by a <code>topology</code> and submits it to the Kubernetes scheduler.
-   * @param packingPlan Used to configure the StatefulSets <code>Resource</code>s and replica count.
-   * @return Success indicator.
-   */
-  @Override
-  boolean submit(PackingPlan packingPlan) {
-    final String topologyName = getTopologyName();
-    if (!topologyName.equals(topologyName.toLowerCase())) {
-      throw new TopologySubmissionException("K8S scheduler does not allow upper case topology's.");
+    /**
+     * <code>Configs</code> contains the Kubernetes cluster configurations as well as the
+     * <code>Pod Templates</code> for the <code>Executor</code>s and <code>Manager</code>.
+     * @param configuration The cluster configurations contains items relating/stored in the Kubernetes cluster.
+     * @param runtimeConfiguration The runtime configurations contain items such as the topology name.
+     * @param managerPodTemplateSpec The <code>Pod Template Spec</code> configurations for a <code>Manager</code>.
+     * @param executorPodTemplateSpec The <code>Pod Template Spec</code> configurations for the <code>Executor</code>s.
+     */
+    Configs(Config configuration, Config runtimeConfiguration,
+            V1PodTemplateSpec managerPodTemplateSpec, V1PodTemplateSpec executorPodTemplateSpec) {
+      this.topologyName = Runtime.topologyName(runtimeConfiguration);
+      this.configuration = configuration;
+      this.runtimeConfiguration = runtimeConfiguration;
+      this.managerPodTemplateSpec = managerPodTemplateSpec;
+      this.executorPodTemplateSpec = executorPodTemplateSpec;
     }
 
-    final Resource containerResource = getContainerResource(packingPlan);
-
-    final V1Service topologyService = createTopologyService();
-    try {
-      coreClient.createNamespacedService(getNamespace(), topologyService, null,
-              null, null);
-    } catch (ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
-      throw new TopologySubmissionException(e.getMessage());
+    Config getConfiguration() {
+      return configuration;
     }
 
-    // Find the max number of instances in a container so that we can open
-    // enough ports if remote debugging is enabled.
-    int numberOfInstances = 0;
-    for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
-      numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
+    Config getRuntimeConfiguration() {
+      return runtimeConfiguration;
     }
-    final V1StatefulSet executors = createStatefulSet(containerResource, numberOfInstances, true);
-    final V1StatefulSet manager = createStatefulSet(containerResource, numberOfInstances, false);
-
-    try {
-      appsClient.createNamespacedStatefulSet(getNamespace(), executors, null,
-              null, null);
-      appsClient.createNamespacedStatefulSet(getNamespace(), manager, null,
-              null, null);
-    } catch (ApiException e) {
-      final String message = String.format("Error creating topology: %s%n", e.getResponseBody());
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-
-    return true;
-  }
-
-  /**
-   * Shuts down a <code>topology</code> by deleting all associated resources.
-   * <ul>
-   *   <li><code>Persistent Volume Claims</code> added by the <code>topology</code>.</li>
-   *   <li><code>StatefulSet</code> for both the <code>Executors</code> and <code>Manager</code>.</li>
-   *   <li>Headless <code>Service</code> which facilitates communication between all Pods.</li>
-   * </ul>
-   * @return Success indicator.
-   */
-  @Override
-  boolean killTopology() {
-    removePersistentVolumeClaims();
-    deleteStatefulSets();
-    deleteService();
-    return true;
-  }
 
-  /**
-   * Restarts a topology by deleting the Pods associated with it using <code>Selector Labels</code>.
-   * @param shardId Not used but required because of interface.
-   * @return Indicator of successful submission of restart request to Kubernetes cluster.
-   */
-  @Override
-  boolean restart(int shardId) {
-    try {
-      coreClient.deleteCollectionNamespacedPod(getNamespace(), null, null, null, null, 0,
-          createTopologySelectorLabels(), null, null, null, null, null, null, null);
-      LOG.log(Level.WARNING, String.format("Restarting topology '%s'...", getTopologyName()));
-    } catch (ApiException e) {
-      LOG.log(Level.SEVERE, String.format("Failed to restart topology '%s'...", getTopologyName()));
-      return false;
+    String getTopologyName() {
+      return topologyName;
     }
-    return true;
-  }
 
-  /**
-   * Adds a specified number of Pods to a <code>topology</code>'s <code>Executors</code>.
-   * @param containersToAdd Set of containers to be added.
-   * @return The passed in <code>Packing Plan</code>.
-   */
-  @Override
-  public Set<PackingPlan.ContainerPlan>
-      addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
-    final V1StatefulSet statefulSet;
-    try {
-      statefulSet = getStatefulSet();
-    } catch (ApiException ae) {
-      final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
-      throw new TopologyRuntimeManagementException(message, ae);
-    }
-    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
-    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
-    final int newContainerCount = currentContainerCount + containersToAdd.size();
-
-    try {
-      patchStatefulSetReplicas(newContainerCount);
-    } catch (ApiException ae) {
-      throw new TopologyRuntimeManagementException(
-          ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
+    V1PodTemplateSpec getManagerPodTemplateSpec() {
+      return managerPodTemplateSpec;
     }
 
-    return containersToAdd;
+    V1PodTemplateSpec getExecutorPodTemplateSpec() {
+      return executorPodTemplateSpec;
+    }
   }
 
-  /**
-   * Removes a specified number of Pods from a <code>topology</code>'s <code>Executors</code>.
-   * @param containersToRemove Set of containers to be removed.
-   */
-  @Override
-  public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
-    final V1StatefulSet statefulSet;
-    try {
-      statefulSet = getStatefulSet();
-    } catch (ApiException ae) {
-      final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
-      throw new TopologyRuntimeManagementException(message, ae);
-    }
+  @VisibleForTesting
+  protected void setClusterConfigs(Configs configs) {
+    this.clusterConfigs = configs;
+  }
 
-    final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
-    final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
-    final int newContainerCount = currentContainerCount - containersToRemove.size();
+  @VisibleForTesting
+  protected StatefulSet() {
+    statefulsets.put(Type.Executor, new ExecutorFactory());
+    statefulsets.put(Type.Manager, new ManagerFactory());
+  }
 
-    try {
-      patchStatefulSetReplicas(newContainerCount);
-    } catch (ApiException e) {
-      throw new TopologyRuntimeManagementException(
-          e.getMessage() + "\ndetails\n" + e.getResponseBody());
-    }
+  static StatefulSet get() {
+    return new StatefulSet();
   }
 
-  /**
-   * Performs an in-place update of the replica count for a <code>topology</code>. This allows the
-   * <code>topology</code> Pod count to be scaled up or down.
-   * @param replicas The new number of Pod replicas required.
-   * @throws ApiException in the event there is a failure patching the StatefulSet.
-   */
-  private void patchStatefulSetReplicas(int replicas) throws ApiException {
-    final String body =
-            String.format(KubernetesConstants.JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
-                    replicas);
-    final V1Patch patch = new V1Patch(body);
-
-    PatchUtils.patch(V1StatefulSet.class,
-        () ->
-          appsClient.patchNamespacedStatefulSetCall(
-            getStatefulSetName(true),
-            getNamespace(),
-            patch,
-            null,
-            null,
-            null,
-            null,
-            null),
-        V1Patch.PATCH_FORMAT_JSON_PATCH,
-        appsClient.getApiClient());
+  interface IStatefulSetFactory {
+    V1StatefulSet create(Configs configs, Resource containerResources, int numberOfInstances);
   }
 
   /**
-   * Retrieves the <code>Executors</code> StatefulSet configurations for the Kubernetes cluster.
-   * @return <code>Executors</code> StatefulSet configurations.
-   * @throws ApiException in the event there is a failure retrieving the StatefulSet.
+   * Creates configured <code>Executor</code> or <code>Manager</code> <code>Stateful Set</code>.
+   * @param type One of <code>Executor</code> or <code>Manager</code>
+   * @param configs Cluster configuration information container.
+   * @param containerResources The container system resource configurations.
+   * @param numberOfInstances The container count.
+   * @return Fully configured <code>Stateful Set</code> or <code>null</code> on invalid <code>type</code>.
    */
-  V1StatefulSet getStatefulSet() throws ApiException {
-    return appsClient.readNamespacedStatefulSet(getStatefulSetName(true), getNamespace(),
-        null);
+  V1StatefulSet create(Type type, Configs configs, Resource containerResources,
+                       int numberOfInstances) {
+    if (statefulsets.containsKey(type)) {
+      return statefulsets.get(type).create(configs, containerResources, numberOfInstances);
+    }
+    return null;
   }
 
-  /**
-   * Deletes the headless <code>Service</code> for a <code>topology</code>'s <code>Executors</code>
-   * and <code>Manager</code> using the <code>topology</code>'s name.
-   */
-  void deleteService() {
-    try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
-          getNamespace(), null, null, 0, null,
-          KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
-
-      if (!response.isSuccessful()) {
-        if (response.code() == HTTP_NOT_FOUND) {
-          LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
-                  + getTopologyName());
-          return;
-        }
-        LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
-                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
-        LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
-        KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+  class ExecutorFactory implements IStatefulSetFactory {
 
-        throw new TopologyRuntimeManagementException(
-                KubernetesUtils.errorMessageFromResponse(response));
-      }
-    } catch (ApiException e) {
-      if (e.getCode() == HTTP_NOT_FOUND) {
-        LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
-                + getTopologyName());
-        return;
-      }
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes service", e);
-    } catch (IOException e) {
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes service", e);
+    @Override
+    public V1StatefulSet create(Configs configs, Resource containerResources,
+                                int numberOfInstances) {
+      setClusterConfigs(configs);
+      return createStatefulSet(containerResources, numberOfInstances, true);
     }
-    LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
-  /**
-   * Deletes the StatefulSets for a <code>topology</code>'s <code>Executors</code> and <code>Manager</code>
-   * using <code>Label</code>s.
-   */
-  void deleteStatefulSets() {
-    try (Response response = appsClient.deleteCollectionNamespacedStatefulSetCall(getNamespace(),
-        null, null, null, null, null, createTopologySelectorLabels(), null, null, null, null, null,
-            null, null, null)
-        .execute()) {
-
-      if (!response.isSuccessful()) {
-        if (response.code() == HTTP_NOT_FOUND) {
-          LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSets for Topology: "
-                  + getTopologyName());
-          return;
-        }
-        LOG.log(Level.SEVERE, "Error when deleting the StatefulSets of the job ["
-                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
-        LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
-        KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+  class ManagerFactory implements IStatefulSetFactory {
 
-        throw new TopologyRuntimeManagementException(
-                KubernetesUtils.errorMessageFromResponse(response));
-      }
-    } catch (ApiException e) {
-      if (e.getCode() == HTTP_NOT_FOUND) {
-        LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
-                + getTopologyName());
-        return;
-      }
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes StatefulSets", e);
-    } catch (IOException e) {
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes StatefulSets", e);
+    @Override
+    public V1StatefulSet create(Configs configs, Resource containerResources,
+                                int numberOfInstances) {
+      setClusterConfigs(configs);
+      return createStatefulSet(containerResources, numberOfInstances, false);
     }
-    LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
+
   /**
    * Generates the command to start Heron within the <code>container</code>.
    * @param containerId Passed down to <>SchedulerUtils</> to generate executor command.
@@ -382,8 +191,8 @@ public class V1Controller extends KubernetesController {
    */
   protected List<String> getExecutorCommand(String containerId, int numOfInstances,
                                             boolean isExecutor) {
-    final Config configuration = getConfiguration();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
+    final Config configuration = clusterConfigs.getConfiguration();
+    final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
     final Map<ExecutorPort, String> ports =
         KubernetesConstants.EXECUTOR_PORTS.entrySet()
             .stream()
@@ -427,32 +236,6 @@ public class V1Controller extends KubernetesController {
     return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  /**
-   * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
-   * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
-   */
-  private V1Service createTopologyService() {
-    final String topologyName = getTopologyName();
-
-    final V1Service service = new V1Service();
-
-    // Setup service metadata.
-    final V1ObjectMeta objectMeta = new V1ObjectMeta()
-        .name(topologyName)
-        .annotations(getServiceAnnotations())
-        .labels(getServiceLabels());
-    service.setMetadata(objectMeta);
-
-    // Create the headless service.
-    final V1ServiceSpec serviceSpec = new V1ServiceSpec()
-        .clusterIP("None")
-        .selector(getPodMatchLabels(topologyName));
-
-    service.setSpec(serviceSpec);
-
-    return service;
-  }
-
   /**
    * Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in.
    * @param containerResource Passed down to configure the <code>executor</code> resource limits.
@@ -462,24 +245,27 @@ public class V1Controller extends KubernetesController {
    */
   private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances,
                                           boolean isExecutor) {
-    final String topologyName = getTopologyName();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
+    final String topologyName = clusterConfigs.getTopologyName();
+    final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
 
     final List<V1Volume> volumes = new LinkedList<>();
     final List<V1VolumeMount> volumeMounts = new LinkedList<>();
 
     // Collect Persistent Volume Claim configurations from the CLI.
     final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configsPVC =
-        KubernetesContext.getVolumeClaimTemplates(getConfiguration(), isExecutor);
+        KubernetesContext.getVolumeClaimTemplates(clusterConfigs.getConfiguration(), isExecutor);
 
     // Collect all Volume configurations from the CLI and generate Volumes and Volume Mounts.
     createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, volumeMounts);
     createVolumeAndMountsHostPathCLI(
-        KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), volumes, volumeMounts);
+        KubernetesContext.getVolumeHostPath(clusterConfigs.getConfiguration(), isExecutor),
+        volumes, volumeMounts);
     createVolumeAndMountsEmptyDirCLI(
-        KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), volumes, volumeMounts);
+        KubernetesContext.getVolumeEmptyDir(clusterConfigs.getConfiguration(), isExecutor),
+        volumes, volumeMounts);
     createVolumeAndMountsNFSCLI(
-        KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), volumes, volumeMounts);
+        KubernetesContext.getVolumeNFS(clusterConfigs.getConfiguration(), isExecutor),
+        volumes, volumeMounts);
 
     final V1StatefulSet statefulSet = new V1StatefulSet();
 
@@ -509,7 +295,9 @@ public class V1Controller extends KubernetesController {
     statefulSetSpec.setSelector(selector);
 
     // Create a Pod Template.
-    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
+    final V1PodTemplateSpec podTemplateSpec =
+        isExecutor ? clusterConfigs.getExecutorPodTemplateSpec()
+        : clusterConfigs.getManagerPodTemplateSpec();
 
     // Set up Pod Metadata.
     final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName));
@@ -536,15 +324,7 @@ public class V1Controller extends KubernetesController {
    * @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod.
    */
   private Map<String, String> getPodAnnotations() {
-    return KubernetesContext.getPodAnnotations(getConfiguration());
-  }
-
-  /**
-   * Extracts <code>Service Annotations</code> for configurations.
-   * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
-   */
-  private Map<String, String> getServiceAnnotations() {
-    return KubernetesContext.getServiceAnnotations(getConfiguration());
+    return KubernetesContext.getPodAnnotations(clusterConfigs.getConfiguration());
   }
 
   /**
@@ -582,18 +362,10 @@ public class V1Controller extends KubernetesController {
     final Map<String, String> labels = new HashMap<>();
     labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
     labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
-    labels.putAll(KubernetesContext.getPodLabels(getConfiguration()));
+    labels.putAll(KubernetesContext.getPodLabels(clusterConfigs.getConfiguration()));
     return labels;
   }
 
-  /**
-   * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
-   * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
-   */
-  private Map<String, String> getServiceLabels() {
-    return KubernetesContext.getServiceLabels(getConfiguration());
-  }
-
   /**
    * Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container
    * will be configured to allow it to function but other supplied containers are loaded verbatim.
@@ -662,8 +434,7 @@ public class V1Controller extends KubernetesController {
    */
   @VisibleForTesting
   protected void configureTolerations(final V1PodSpec spec) {
-    KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1Toleration> utils = new KubernetesUtils.CommonUtils<>();
     spec.setTolerations(
         utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
             Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations")
@@ -695,7 +466,7 @@ public class V1Controller extends KubernetesController {
    * @param podSpec <code>Pod Spec</code> to add secrets to.
    */
   private void mountSecretsAsVolumes(V1PodSpec podSpec) {
-    final Config config = getConfiguration();
+    final Config config = clusterConfigs.getConfiguration();
     final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
     for (Map.Entry<String, String> secret : secrets.entrySet()) {
       final V1VolumeMount mount = new V1VolumeMount()
@@ -722,7 +493,7 @@ public class V1Controller extends KubernetesController {
    */
   private void configureHeronContainer(Resource resource, int numberOfInstances,
                                        final V1Container container, boolean isExecutor) {
-    final Config configuration = getConfiguration();
+    final Config configuration = clusterConfigs.getConfiguration();
 
     // Set up the container images.
     container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
@@ -748,7 +519,7 @@ public class V1Controller extends KubernetesController {
     // Set container ports.
     final boolean debuggingEnabled =
         TopologyUtils.getTopologyRemoteDebuggingEnabled(
-            Runtime.topology(getRuntimeConfiguration()));
+            Runtime.topology(clusterConfigs.getRuntimeConfiguration()));
     configureContainerPorts(debuggingEnabled, numberOfInstances, container);
 
     // setup volume mounts
@@ -852,7 +623,7 @@ public class V1Controller extends KubernetesController {
   @VisibleForTesting
   protected void configureContainerEnvVars(final V1Container container) {
     // Deduplicate on var name with Heron defaults take precedence.
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1EnvVar> utils = new KubernetesUtils.CommonUtils<>();
     container.setEnv(
         utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
           Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables")
@@ -896,8 +667,7 @@ public class V1Controller extends KubernetesController {
     }
 
     // Set container ports. Deduplicate using port number with Heron defaults taking precedence.
-    KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1ContainerPort> utils = new KubernetesUtils.CommonUtils<>();
     container.setPorts(
         utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
             Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports")
@@ -945,7 +715,7 @@ public class V1Controller extends KubernetesController {
    */
   @VisibleForTesting
   protected void mountVolumeIfPresent(final V1Container container) {
-    final Config config = getConfiguration();
+    final Config config = clusterConfigs.getConfiguration();
     if (KubernetesContext.hasContainerVolume(config)) {
       final V1VolumeMount mount =
           new V1VolumeMount()
@@ -953,8 +723,7 @@ public class V1Controller extends KubernetesController {
               .mountPath(KubernetesContext.getContainerVolumeMountPath(config));
 
       // Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence.
-      KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
-          new KubernetesUtils.V1ControllerUtils<>();
+      KubernetesUtils.CommonUtils<V1VolumeMount> utils = new KubernetesUtils.CommonUtils<>();
       container.setVolumeMounts(
           utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(),
               Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts")
@@ -967,15 +736,15 @@ public class V1Controller extends KubernetesController {
    * @param container <code>container</code> to be configured.
    */
   private void setSecretKeyRefs(V1Container container) {
-    final Config config = getConfiguration();
+    final Config config = clusterConfigs.getConfiguration();
     final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
     for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
       final String[] keyRefParts = secret.getValue().split(":");
       if (keyRefParts.length != 2) {
-        LOG.log(Level.SEVERE,
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
-        throw new TopologyRuntimeManagementException(
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+        final String msg =
+            String.format("SecretKeyRef must be in the form name:key. <%s>", secret.getValue());
+        LOG.log(Level.SEVERE, msg);
+        throw new TopologyRuntimeManagementException(msg);
       }
       String name = keyRefParts[0];
       String key = keyRefParts[1];
@@ -989,113 +758,6 @@ public class V1Controller extends KubernetesController {
     }
   }
 
-  /**
-   * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
-   * The loaded text is then parsed into a usable <code>Pod Template</code>.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
-    final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
-
-    // Default Pod Template.
-    if (podTemplateConfigMapName == null) {
-      LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
-      return new V1PodTemplateSpec();
-    }
-
-    if (isPodTemplateDisabled) {
-      throw new TopologySubmissionException("Custom Pod Templates are disabled");
-    }
-
-    final String configMapName = podTemplateConfigMapName.first;
-    final String podTemplateName = podTemplateConfigMapName.second;
-
-    // Attempt to locate ConfigMap with provided Pod Template name.
-    try {
-      V1ConfigMap configMap = getConfigMap(configMapName);
-      if (configMap == null) {
-        throw new ApiException(
-            String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
-      }
-
-      final Map<String, String> configMapData = configMap.getData();
-      if (configMapData != null && configMapData.containsKey(podTemplateName)) {
-        // NullPointerException when Pod Template is empty.
-        V1PodTemplateSpec podTemplate = ((V1PodTemplate)
-            Yaml.load(configMapData.get(podTemplateName))).getTemplate();
-        LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
-            configMapName, podTemplateName));
-        return podTemplate;
-      }
-
-      // Failure to locate Pod Template with provided name.
-      throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
-          podTemplateName, configMapName));
-    } catch (ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
-      throw new TopologySubmissionException(e.getMessage());
-    } catch (IOException | ClassCastException | NullPointerException e) {
-      final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
-          podTemplateName, configMapName);
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
-   */
-  @VisibleForTesting
-  protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
-    final String podTemplateConfigMapName = KubernetesContext
-        .getPodTemplateConfigMapName(getConfiguration(), isExecutor);
-
-    if (podTemplateConfigMapName == null) {
-      return null;
-    }
-
-    try {
-      final int splitPoint = podTemplateConfigMapName.indexOf(".");
-      final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
-      final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
-
-      if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
-        throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
-      }
-
-      return new Pair<>(configMapName, podTemplateName);
-    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
-      final String message = "Invalid ConfigMap and/or Pod Template name";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
-   * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
-   * @return The retrieved <code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1ConfigMap getConfigMap(String configMapName) {
-    try {
-      return coreClient.readNamespacedConfigMap(
-          configMapName,
-          getNamespace(),
-          null);
-    } catch (ApiException e) {
-      final String message = "Error retrieving ConfigMaps";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
-    }
-  }
-
   /**
    * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
    * to <code>key-value</code> pairs of configuration options and values.
@@ -1120,7 +782,7 @@ public class V1Controller extends KubernetesController {
 
       listOfPVCs.add(Volumes.get()
           .createPersistentVolumeClaim(pvc.getKey(),
-              getPersistentVolumeClaimLabels(getTopologyName()), pvc.getValue()));
+              getPersistentVolumeClaimLabels(clusterConfigs.getTopologyName()), pvc.getValue()));
     }
     return listOfPVCs;
   }
@@ -1226,71 +888,19 @@ public class V1Controller extends KubernetesController {
 
     // Deduplicate on Names with Persistent Volume Claims taking precedence.
 
-    KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1Volume> utilsVolumes = new KubernetesUtils.CommonUtils<>();
     podSpec.setVolumes(
         utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
             Comparator.comparing(V1Volume::getName),
             "Pod with Volumes"));
 
-    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1VolumeMount> utilsMounts = new KubernetesUtils.CommonUtils<>();
     executor.setVolumeMounts(
         utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
             Comparator.comparing(V1VolumeMount::getName),
             "Heron container with Volume Mounts"));
   }
 
-  /**
-   * Removes all Persistent Volume Claims associated with a specific topology, if they exist.
-   * It looks for the following:
-   * metadata:
-   *   labels:
-   *     topology: <code>topology-name</code>
-   *     onDemand: <code>true</code>
-   */
-  private void removePersistentVolumeClaims() {
-    final String name = getTopologyName();
-    final StringBuilder selectorLabel = new StringBuilder();
-
-    // Generate selector label.
-    for (Map.Entry<String, String> label : getPersistentVolumeClaimLabels(name).entrySet()) {
-      if (selectorLabel.length() != 0) {
-        selectorLabel.append(",");
-      }
-      selectorLabel.append(label.getKey()).append("=").append(label.getValue());
-    }
-
-    // Remove all dynamically backed Persistent Volume Claims.
-    try {
-      V1Status status = coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
-          getNamespace(),
-          null,
-          null,
-          null,
-          null,
-          null,
-          selectorLabel.toString(),
-          null,
-          null,
-          null,
-          null,
-          null,
-          null,
-          null);
-
-      LOG.log(Level.INFO,
-          String.format("Removing automatically generated Persistent Volume Claims for `%s`:%n%s",
-          name, status.getMessage()));
-    } catch (ApiException e) {
-      final String message = String.format("Failed to connect to K8s cluster to delete Persistent "
-              + "Volume Claims for topology `%s`. A manual clean-up is required.%n%s",
-          name, e.getMessage());
-      LOG.log(Level.WARNING, message);
-      throw new TopologyRuntimeManagementException(message);
-    }
-  }
-
   /**
    * Generates the <code>Label</code> which are attached to a Topology's Persistent Volume Claims.
    * @param topologyName Attached to the topology match label.
@@ -1313,17 +923,7 @@ public class V1Controller extends KubernetesController {
    * @return String <code>"topology-name"-executors</code>.
    */
   private String getStatefulSetName(boolean isExecutor) {
-    return String.format("%s-%s", getTopologyName(),
+    return String.format("%s-%s", clusterConfigs.getTopologyName(),
         isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
   }
-
-  /**
-   * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
-   * @return A label of the form <code>app=heron,topology=topology-name</code>.
-   */
-  private String createTopologySelectorLabels() {
-    return String.format("%s=%s,%s=%s",
-        KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
-        KubernetesConstants.LABEL_TOPOLOGY, getTopologyName());
-  }
 }
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 3ad9ffedf36..c8eac176c74 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -205,7 +205,8 @@ java_tests(
         "org.apache.heron.scheduler.kubernetes.KubernetesLauncherTest",
         "org.apache.heron.scheduler.kubernetes.VolumesTests",
         "org.apache.heron.scheduler.kubernetes.KubernetesContextTest",
-        "org.apache.heron.scheduler.kubernetes.V1ControllerTest",
+        "org.apache.heron.scheduler.kubernetes.KubernetesShimTest",
+        "org.apache.heron.scheduler.kubernetes.StatefulSetTest",
         "org.apache.heron.scheduler.kubernetes.KubernetesUtilsTest",
     ],
     runtime_deps = [":kubernetes-tests"],
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
new file mode 100644
index 00000000000..ec1be4e87a9
--- /dev/null
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KubernetesShimTest {
+
+  private static final String TOPOLOGY_NAME = "topology-name";
+  private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+  private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+  private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+      String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+  private static final String POD_TEMPLATE_VALID =
+      "apiVersion: apps/v1\n"
+          + "kind: PodTemplate\n"
+          + "metadata:\n"
+          + "  name: heron-tracker\n"
+          + "  namespace: default\n"
+          + "template:\n"
+          + "  metadata:\n"
+          + "    labels:\n"
+          + "      app: heron-tracker\n"
+          + "  spec:\n"
+          + "    containers:\n"
+          + "      - name: heron-tracker\n"
+          + "        image: apache/heron:latest\n"
+          + "        ports:\n"
+          + "          - containerPort: 8888\n"
+          + "            name: api-port\n"
+          + "        resources:\n"
+          + "          requests:\n"
+          + "            cpu: \"100m\"\n"
+          + "            memory: \"200M\"\n"
+          + "          limits:\n"
+          + "            cpu: \"400m\"\n"
+          + "            memory: \"512M\"";
+  private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
+      String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+          KubernetesConstants.EXECUTOR_NAME);
+  private static final String POD_TEMPLATE_LOCATION_MANAGER =
+      String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+          KubernetesConstants.MANAGER_NAME);
+
+  private static final Config CONFIG = Config.newBuilder().build();
+  private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
+      .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+      .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+      .build();
+  private static final Config RUNTIME = Config.newBuilder()
+      .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+      .build();
+  private final Config configDisabledPodTemplate = Config.newBuilder()
+      .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+      .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_DISABLED, "true")
+      .build();
+
+  @Spy
+  private final KubernetesShim v1ControllerWithPodTemplate =
+      new KubernetesShim(CONFIG_WITH_POD_TEMPLATE, RUNTIME);
+
+  @Spy
+  private final KubernetesShim v1ControllerPodTemplate =
+      new KubernetesShim(configDisabledPodTemplate, RUNTIME);
+
+  @Rule
+  public final ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testLoadPodFromTemplateDefault() {
+    final KubernetesShim v1ControllerNoPodTemplate = new KubernetesShim(CONFIG, RUNTIME);
+    final V1PodTemplateSpec defaultPodSpec = new V1PodTemplateSpec();
+
+    final V1PodTemplateSpec podSpecExecutor = v1ControllerNoPodTemplate.loadPodFromTemplate(true);
+    Assert.assertEquals("Default Pod Spec for Executor", defaultPodSpec, podSpecExecutor);
+
+    final V1PodTemplateSpec podSpecManager = v1ControllerNoPodTemplate.loadPodFromTemplate(false);
+    Assert.assertEquals("Default Pod Spec for Manager", defaultPodSpec, podSpecManager);
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNullConfigMap() {
+    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor not found", true, "unable to locate"));
+    testCases.add(new TestTuple<>("Manager not found", false, "unable to locate"));
+
+    for (TestTuple<Boolean, String> testCase : testCases) {
+      doReturn(null)
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      String message = "";
+      try {
+        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNoConfigMap() {
+    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor no ConfigMap", true, "Failed to locate Pod Template"));
+    testCases.add(new TestTuple<>("Manager no ConfigMap", false, "Failed to locate Pod Template"));
+
+    for (TestTuple<Boolean, String> testCase : testCases) {
+      doReturn(new V1ConfigMap())
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      String message = "";
+      try {
+        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testLoadPodFromTemplateNoTargetConfigMap() {
+    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor no target ConfigMap",
+        true, "Failed to locate Pod Template"));
+    testCases.add(new TestTuple<>("Manager no target ConfigMap",
+        false, "Failed to locate Pod Template"));
+
+    final V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData("Dummy Key", "Dummy Value")
+        .build();
+
+    for (TestTuple<Boolean, String> testCase : testCases) {
+      doReturn(configMapNoTargetData)
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      String message = "";
+      try {
+        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testLoadPodFromTemplateBadTargetConfigMap() {
+    // ConfigMap with target ConfigMap and an invalid Pod Template.
+    final V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, "Dummy Value")
+        .build();
+
+    // ConfigMap with target ConfigMaps and an empty Pod Template.
+    final V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+        .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, "")
+        .build();
+
+    // Test case container.
+    // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+    // Output: The expected error message.
+    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor invalid Pod Template",
+        new Pair<>(configMapInvalidPod, true), "Error parsing"));
+    testCases.add(new TestTuple<>("Manager invalid Pod Template",
+        new Pair<>(configMapInvalidPod, false), "Error parsing"));
+    testCases.add(new TestTuple<>("Executor empty Pod Template",
+        new Pair<>(configMapEmptyPod, true), "Error parsing"));
+    testCases.add(new TestTuple<>("Manager empty Pod Template",
+        new Pair<>(configMapEmptyPod, false), "Error parsing"));
+
+    // Test loop.
+    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+      doReturn(testCase.input.first)
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      String message = "";
+      try {
+        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testLoadPodFromTemplateValidConfigMap() {
+    final String expected =
+        "        containers: [class V1Container {\n"
+        + "            args: null\n"
+        + "            command: null\n"
+        + "            env: null\n"
+        + "            envFrom: null\n"
+        + "            image: apache/heron:latest\n"
+        + "            imagePullPolicy: null\n"
+        + "            lifecycle: null\n"
+        + "            livenessProbe: null\n"
+        + "            name: heron-tracker\n"
+        + "            ports: [class V1ContainerPort {\n"
+        + "                containerPort: 8888\n"
+        + "                hostIP: null\n"
+        + "                hostPort: null\n"
+        + "                name: api-port\n"
+        + "                protocol: null\n"
+        + "            }]\n"
+        + "            readinessProbe: null\n"
+        + "            resources: class V1ResourceRequirements {\n"
+        + "                limits: {cpu=Quantity{number=0.400, format=DECIMAL_SI}, "
+        + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
+        + "                requests: {cpu=Quantity{number=0.100, format=DECIMAL_SI}, "
+        + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
+        + "            }\n"
+        + "            securityContext: null\n"
+        + "            startupProbe: null\n"
+        + "            stdin: null\n"
+        + "            stdinOnce: null\n"
+        + "            terminationMessagePath: null\n"
+        + "            terminationMessagePolicy: null\n"
+        + "            tty: null\n"
+        + "            volumeDevices: null\n"
+        + "            volumeMounts: null\n"
+        + "            workingDir: null\n"
+        + "        }]";
+
+
+    // ConfigMap with valid Pod Template.
+    final V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+        .build();
+
+    // Test case container.
+    // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+    // Output: The expected Pod template as a string.
+    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor valid Pod Template",
+        new Pair<>(configMapValidPod, true), expected));
+    testCases.add(new TestTuple<>("Manager valid Pod Template",
+        new Pair<>(configMapValidPod, false), expected));
+
+    // Test loop.
+    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+      doReturn(testCase.input.first)
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      V1PodTemplateSpec podTemplateSpec = v1ControllerWithPodTemplate.loadPodFromTemplate(true);
+
+      Assert.assertTrue(podTemplateSpec.toString().contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testLoadPodFromTemplateInvalidConfigMap() {
+    // ConfigMap with an invalid Pod Template.
+    final String invalidPodTemplate =
+        "apiVersion: apps/v1\n"
+            + "kind: InvalidTemplate\n"
+            + "metadata:\n"
+            + "  name: heron-tracker\n"
+            + "  namespace: default\n"
+            + "template:\n"
+            + "  metadata:\n"
+            + "    labels:\n"
+            + "      app: heron-tracker\n"
+            + "  spec:\n";
+    final V1ConfigMap configMap = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
+        .build();
+
+
+    // Test case container.
+    // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+    // Output: The expected Pod template as a string.
+    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+    testCases.add(new TestTuple<>("Executor invalid Pod Template",
+        new Pair<>(configMap, true), "Error parsing"));
+    testCases.add(new TestTuple<>("Manager invalid Pod Template",
+        new Pair<>(configMap, false), "Error parsing"));
+
+    // Test loop.
+    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+      doReturn(testCase.input.first)
+          .when(v1ControllerWithPodTemplate)
+          .getConfigMap(anyString());
+
+      String message = "";
+      try {
+        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
+      } catch (TopologySubmissionException e) {
+        message = e.getMessage();
+      }
+      Assert.assertTrue(message.contains(testCase.expected));
+    }
+  }
+
+  @Test
+  public void testDisablePodTemplates() {
+    // ConfigMap with valid Pod Template.
+    V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+        .withNewMetadata()
+          .withName(CONFIGMAP_NAME)
+        .endMetadata()
+        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+        .build();
+    final String expected = "Pod Templates are disabled";
+    String message = "";
+    doReturn(configMapValidPod)
+        .when(v1ControllerPodTemplate)
+        .getConfigMap(anyString());
+
+    try {
+      v1ControllerPodTemplate.loadPodFromTemplate(true);
+    } catch (TopologySubmissionException e) {
+      message = e.getMessage();
+    }
+    Assert.assertTrue(message.contains(expected));
+  }
+
+  @Test
+  public void testGetPodTemplateLocationPassing() {
+    final Config testConfig = Config.newBuilder()
+        .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+        .build();
+    final KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+    final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+
+    // Correct parsing
+    final Pair<String, String> actual = kubernetesShim.getPodTemplateLocation(true);
+    Assert.assertEquals(expected, actual);
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoConfigMap() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(POD_TEMPLATE_LOCATION_EXECUTOR, ".POD-TEMPLATE-NAME").build();
+    KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+    kubernetesShim.getPodTemplateLocation(true);
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoPodTemplate() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAME.").build();
+    KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+    kubernetesShim.getPodTemplateLocation(true);
+  }
+
+  @Test
+  public void testGetPodTemplateLocationNoDelimiter() {
+    expectedException.expect(TopologySubmissionException.class);
+    final Config testConfig = Config.newBuilder()
+        .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
+    KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+    kubernetesShim.getPodTemplateLocation(true);
+  }
+}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
index ddc1f692dc4..4a80571c6d0 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -43,14 +43,14 @@ public class KubernetesUtilsTest {
   public void testMergeListsDedupe() {
     final String description = "Pod Template Environment Variables";
     final List<V1EnvVar> heronEnvVars =
-        Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+        Collections.unmodifiableList(StatefulSet.getExecutorEnvVars());
     final V1EnvVar additionEnvVar = new V1EnvVar()
         .name("env-variable-to-be-kept")
         .valueFrom(new V1EnvVarSource()
             .fieldRef(new V1ObjectFieldSelector()
                 .fieldPath("env-variable-was-kept")));
     final List<V1EnvVar> expectedEnvVars = Collections.unmodifiableList(
-        new LinkedList<V1EnvVar>(V1Controller.getExecutorEnvVars()) {{
+        new LinkedList<V1EnvVar>(StatefulSet.getExecutorEnvVars()) {{
           add(additionEnvVar);
         }});
     final List<V1EnvVar> inputEnvVars = Arrays.asList(
@@ -67,48 +67,48 @@ public class KubernetesUtilsTest {
         additionEnvVar
     );
 
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> v1ControllerUtils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1EnvVar> commonUtils =
+        new KubernetesUtils.CommonUtils<>();
 
     // Both input lists are null.
     Assert.assertNull("Both input lists are <null>",
-         v1ControllerUtils.mergeListsDedupe(null, null,
+         commonUtils.mergeListsDedupe(null, null,
              Comparator.comparing(V1EnvVar::getName), description));
 
     // <primaryList> is <null>.
     Assert.assertEquals("<primaryList> is null and <secondaryList> should be returned",
         inputEnvVars,
-        v1ControllerUtils.mergeListsDedupe(null, inputEnvVars,
+        commonUtils.mergeListsDedupe(null, inputEnvVars,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <primaryList> is empty.
     Assert.assertEquals("<primaryList> is empty and <secondaryList> should be returned",
         inputEnvVars,
-        v1ControllerUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
+        commonUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <secondaryList> is <null>.
     Assert.assertEquals("<secondaryList> is null and <primaryList> should be returned",
         heronEnvVars,
-        v1ControllerUtils.mergeListsDedupe(heronEnvVars, null,
+        commonUtils.mergeListsDedupe(heronEnvVars, null,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <secondaryList> is empty.
     Assert.assertEquals("<secondaryList> is empty and <primaryList> should be returned",
         heronEnvVars,
-        v1ControllerUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
+        commonUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
             Comparator.comparing(V1EnvVar::getName), description));
 
     // Merge both lists.
     Assert.assertTrue("<primaryList> and <secondaryList> merged and deduplicated",
         expectedEnvVars.containsAll(
-            v1ControllerUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
+            commonUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
                 Comparator.comparing(V1EnvVar::getName), description)));
 
     // Expect thrown error.
     String errorMessage = "";
     try {
-      v1ControllerUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
+      commonUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
           Comparator.comparing(V1EnvVar::getName), description);
     } catch (TopologySubmissionException e) {
       errorMessage = e.getMessage();
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
similarity index 68%
rename from heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
rename to heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
index a039ca980e5..d58c42102c1 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
@@ -29,28 +29,22 @@ import java.util.Map;
 import com.google.common.collect.ImmutableMap;
 
 import org.junit.Assert;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.rules.ExpectedException;
 import org.junit.runner.RunWith;
-import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
-import org.apache.heron.scheduler.TopologySubmissionException;
-import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Key;
 import org.apache.heron.spi.packing.Resource;
 
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1ContainerBuilder;
 import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1ContainerPortBuilder;
 import io.kubernetes.client.openapi.models.V1EnvVar;
 import io.kubernetes.client.openapi.models.V1EnvVarSource;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
@@ -59,6 +53,7 @@ import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
 import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpecBuilder;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
@@ -67,49 +62,22 @@ import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
 import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
+import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+import static org.apache.heron.scheduler.kubernetes.StatefulSet.getTolerations;
 
 @RunWith(MockitoJUnitRunner.class)
-public class V1ControllerTest {
-
+public class StatefulSetTest {
   private static final String TOPOLOGY_NAME = "topology-name";
   private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
   private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
   private static final String CONFIGMAP_POD_TEMPLATE_NAME =
       String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
-  private static final String POD_TEMPLATE_VALID =
-      "apiVersion: apps/v1\n"
-          + "kind: PodTemplate\n"
-          + "metadata:\n"
-          + "  name: heron-tracker\n"
-          + "  namespace: default\n"
-          + "template:\n"
-          + "  metadata:\n"
-          + "    labels:\n"
-          + "      app: heron-tracker\n"
-          + "  spec:\n"
-          + "    containers:\n"
-          + "      - name: heron-tracker\n"
-          + "        image: apache/heron:latest\n"
-          + "        ports:\n"
-          + "          - containerPort: 8888\n"
-          + "            name: api-port\n"
-          + "        resources:\n"
-          + "          requests:\n"
-          + "            cpu: \"100m\"\n"
-          + "            memory: \"200M\"\n"
-          + "          limits:\n"
-          + "            cpu: \"400m\"\n"
-          + "            memory: \"512M\"";
   private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
       String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
           KubernetesConstants.EXECUTOR_NAME);
   private static final String POD_TEMPLATE_LOCATION_MANAGER =
       String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
           KubernetesConstants.MANAGER_NAME);
-
-  private static final Config CONFIG = Config.newBuilder().build();
   private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
       .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
       .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
@@ -117,332 +85,30 @@ public class V1ControllerTest {
   private static final Config RUNTIME = Config.newBuilder()
       .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
       .build();
-  private final Config configDisabledPodTemplate = Config.newBuilder()
-      .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
-      .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
-      .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_DISABLED, "true")
+  private static final StatefulSet STATEFUL_SET = new StatefulSet();
+  private static final V1PodTemplateSpec POD_TEMPLATE_SPEC = new V1PodTemplateSpecBuilder()
+      .withNewMetadata()
+        .withLabels(Collections.singletonMap("app", "heron-tracker"))
+      .endMetadata()
+      .withNewSpec()
+      .withContainers(new V1ContainerBuilder()
+          .withName("heron-tracker")
+          .withImage("apache/heron:latest")
+          .withPorts(new V1ContainerPortBuilder()
+              .withName("api-port")
+              .withContainerPort(8888)
+            .build())
+          .withNewResources()
+          .withRequests(Collections.singletonMap("100m", new Quantity("200M")))
+          .withLimits(Collections.singletonMap("400m", new Quantity("512M")))
+          .endResources()
+          .build())
+      .endSpec()
       .build();
+  private static final StatefulSet.Configs CLUSTER_CONFIGS =
+      new StatefulSet.Configs(CONFIG_WITH_POD_TEMPLATE, RUNTIME,
+        POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
 
-  @Spy
-  private final V1Controller v1ControllerWithPodTemplate =
-      new V1Controller(CONFIG_WITH_POD_TEMPLATE, RUNTIME);
-
-  @Spy
-  private final V1Controller v1ControllerPodTemplate =
-      new V1Controller(configDisabledPodTemplate, RUNTIME);
-
-  @Rule
-  public final ExpectedException expectedException = ExpectedException.none();
-
-  @Test
-  public void testLoadPodFromTemplateDefault() {
-    final V1Controller v1ControllerNoPodTemplate = new V1Controller(CONFIG, RUNTIME);
-    final V1PodTemplateSpec defaultPodSpec = new V1PodTemplateSpec();
-
-    final V1PodTemplateSpec podSpecExecutor = v1ControllerNoPodTemplate.loadPodFromTemplate(true);
-    Assert.assertEquals("Default Pod Spec for Executor", defaultPodSpec, podSpecExecutor);
-
-    final V1PodTemplateSpec podSpecManager = v1ControllerNoPodTemplate.loadPodFromTemplate(false);
-    Assert.assertEquals("Default Pod Spec for Manager", defaultPodSpec, podSpecManager);
-  }
-
-  @Test
-  public void testLoadPodFromTemplateNullConfigMap() {
-    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor not found", true, "unable to locate"));
-    testCases.add(new TestTuple<>("Manager not found", false, "unable to locate"));
-
-    for (TestTuple<Boolean, String> testCase : testCases) {
-      doReturn(null)
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      String message = "";
-      try {
-        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
-      } catch (TopologySubmissionException e) {
-        message = e.getMessage();
-      }
-      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testLoadPodFromTemplateNoConfigMap() {
-    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor no ConfigMap", true, "Failed to locate Pod Template"));
-    testCases.add(new TestTuple<>("Manager no ConfigMap", false, "Failed to locate Pod Template"));
-
-    for (TestTuple<Boolean, String> testCase : testCases) {
-      doReturn(new V1ConfigMap())
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      String message = "";
-      try {
-        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
-      } catch (TopologySubmissionException e) {
-        message = e.getMessage();
-      }
-      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testLoadPodFromTemplateNoTargetConfigMap() {
-    final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor no target ConfigMap",
-        true, "Failed to locate Pod Template"));
-    testCases.add(new TestTuple<>("Manager no target ConfigMap",
-        false, "Failed to locate Pod Template"));
-
-    final V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData("Dummy Key", "Dummy Value")
-        .build();
-
-    for (TestTuple<Boolean, String> testCase : testCases) {
-      doReturn(configMapNoTargetData)
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      String message = "";
-      try {
-        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
-      } catch (TopologySubmissionException e) {
-        message = e.getMessage();
-      }
-      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testLoadPodFromTemplateBadTargetConfigMap() {
-    // ConfigMap with target ConfigMap and an invalid Pod Template.
-    final V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_NAME, "Dummy Value")
-        .build();
-
-    // ConfigMap with target ConfigMaps and an empty Pod Template.
-    final V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
-        .withNewMetadata()
-        .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_NAME, "")
-        .build();
-
-    // Test case container.
-    // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
-    // Output: The expected error message.
-    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor invalid Pod Template",
-        new Pair<>(configMapInvalidPod, true), "Error parsing"));
-    testCases.add(new TestTuple<>("Manager invalid Pod Template",
-        new Pair<>(configMapInvalidPod, false), "Error parsing"));
-    testCases.add(new TestTuple<>("Executor empty Pod Template",
-        new Pair<>(configMapEmptyPod, true), "Error parsing"));
-    testCases.add(new TestTuple<>("Manager empty Pod Template",
-        new Pair<>(configMapEmptyPod, false), "Error parsing"));
-
-    // Test loop.
-    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
-      doReturn(testCase.input.first)
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      String message = "";
-      try {
-        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
-      } catch (TopologySubmissionException e) {
-        message = e.getMessage();
-      }
-      Assert.assertTrue(testCase.description, message.contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testLoadPodFromTemplateValidConfigMap() {
-    final String expected =
-        "        containers: [class V1Container {\n"
-        + "            args: null\n"
-        + "            command: null\n"
-        + "            env: null\n"
-        + "            envFrom: null\n"
-        + "            image: apache/heron:latest\n"
-        + "            imagePullPolicy: null\n"
-        + "            lifecycle: null\n"
-        + "            livenessProbe: null\n"
-        + "            name: heron-tracker\n"
-        + "            ports: [class V1ContainerPort {\n"
-        + "                containerPort: 8888\n"
-        + "                hostIP: null\n"
-        + "                hostPort: null\n"
-        + "                name: api-port\n"
-        + "                protocol: null\n"
-        + "            }]\n"
-        + "            readinessProbe: null\n"
-        + "            resources: class V1ResourceRequirements {\n"
-        + "                limits: {cpu=Quantity{number=0.400, format=DECIMAL_SI}, "
-        + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
-        + "                requests: {cpu=Quantity{number=0.100, format=DECIMAL_SI}, "
-        + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
-        + "            }\n"
-        + "            securityContext: null\n"
-        + "            startupProbe: null\n"
-        + "            stdin: null\n"
-        + "            stdinOnce: null\n"
-        + "            terminationMessagePath: null\n"
-        + "            terminationMessagePolicy: null\n"
-        + "            tty: null\n"
-        + "            volumeDevices: null\n"
-        + "            volumeMounts: null\n"
-        + "            workingDir: null\n"
-        + "        }]";
-
-
-    // ConfigMap with valid Pod Template.
-    final V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
-        .build();
-
-    // Test case container.
-    // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
-    // Output: The expected Pod template as a string.
-    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor valid Pod Template",
-        new Pair<>(configMapValidPod, true), expected));
-    testCases.add(new TestTuple<>("Manager valid Pod Template",
-        new Pair<>(configMapValidPod, false), expected));
-
-    // Test loop.
-    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
-      doReturn(testCase.input.first)
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      V1PodTemplateSpec podTemplateSpec = v1ControllerWithPodTemplate.loadPodFromTemplate(true);
-
-      Assert.assertTrue(podTemplateSpec.toString().contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testLoadPodFromTemplateInvalidConfigMap() {
-    // ConfigMap with an invalid Pod Template.
-    final String invalidPodTemplate =
-        "apiVersion: apps/v1\n"
-            + "kind: InvalidTemplate\n"
-            + "metadata:\n"
-            + "  name: heron-tracker\n"
-            + "  namespace: default\n"
-            + "template:\n"
-            + "  metadata:\n"
-            + "    labels:\n"
-            + "      app: heron-tracker\n"
-            + "  spec:\n";
-    final V1ConfigMap configMap = new V1ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
-        .build();
-
-
-    // Test case container.
-    // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
-    // Output: The expected Pod template as a string.
-    final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
-    testCases.add(new TestTuple<>("Executor invalid Pod Template",
-        new Pair<>(configMap, true), "Error parsing"));
-    testCases.add(new TestTuple<>("Manager invalid Pod Template",
-        new Pair<>(configMap, false), "Error parsing"));
-
-    // Test loop.
-    for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
-      doReturn(testCase.input.first)
-          .when(v1ControllerWithPodTemplate)
-          .getConfigMap(anyString());
-
-      String message = "";
-      try {
-        v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
-      } catch (TopologySubmissionException e) {
-        message = e.getMessage();
-      }
-      Assert.assertTrue(message.contains(testCase.expected));
-    }
-  }
-
-  @Test
-  public void testDisablePodTemplates() {
-    // ConfigMap with valid Pod Template.
-    V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
-        .withNewMetadata()
-          .withName(CONFIGMAP_NAME)
-        .endMetadata()
-        .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
-        .build();
-    final String expected = "Pod Templates are disabled";
-    String message = "";
-    doReturn(configMapValidPod)
-        .when(v1ControllerPodTemplate)
-        .getConfigMap(anyString());
-
-    try {
-      v1ControllerPodTemplate.loadPodFromTemplate(true);
-    } catch (TopologySubmissionException e) {
-      message = e.getMessage();
-    }
-    Assert.assertTrue(message.contains(expected));
-  }
-
-  @Test
-  public void testGetPodTemplateLocationPassing() {
-    final Config testConfig = Config.newBuilder()
-        .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
-        .build();
-    final V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
-    final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME, POD_TEMPLATE_NAME);
-
-    // Correct parsing
-    final Pair<String, String> actual = v1Controller.getPodTemplateLocation(true);
-    Assert.assertEquals(expected, actual);
-  }
-
-  @Test
-  public void testGetPodTemplateLocationNoConfigMap() {
-    expectedException.expect(TopologySubmissionException.class);
-    final Config testConfig = Config.newBuilder()
-        .put(POD_TEMPLATE_LOCATION_EXECUTOR, ".POD-TEMPLATE-NAME").build();
-    V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
-    v1Controller.getPodTemplateLocation(true);
-  }
-
-  @Test
-  public void testGetPodTemplateLocationNoPodTemplate() {
-    expectedException.expect(TopologySubmissionException.class);
-    final Config testConfig = Config.newBuilder()
-        .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAME.").build();
-    V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
-    v1Controller.getPodTemplateLocation(true);
-  }
-
-  @Test
-  public void testGetPodTemplateLocationNoDelimiter() {
-    expectedException.expect(TopologySubmissionException.class);
-    final Config testConfig = Config.newBuilder()
-        .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
-    V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
-    v1Controller.getPodTemplateLocation(true);
-  }
 
   @Test
   public void testConfigureContainerPorts() {
@@ -450,9 +116,9 @@ public class V1ControllerTest {
     final int portNumberkept = 1111;
     final int numInstances = 3;
     final List<V1ContainerPort> expectedPortsBase =
-        Collections.unmodifiableList(V1Controller.getExecutorPorts());
+        Collections.unmodifiableList(StatefulSet.getExecutorPorts());
     final List<V1ContainerPort> debugPorts =
-        Collections.unmodifiableList(V1Controller.getDebuggingPorts(numInstances));
+        Collections.unmodifiableList(StatefulSet.getDebuggingPorts(numInstances));
     final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
         Arrays.asList(
             new V1ContainerPort()
@@ -463,9 +129,12 @@ public class V1ControllerTest {
         )
     );
 
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     // Null ports. This is the default case.
     final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithNullPorts);
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithNullPorts);
     Assert.assertTrue("Server and/or shell PORTS for container with null ports list",
         CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase));
 
@@ -473,7 +142,7 @@ public class V1ControllerTest {
     final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
         .withPorts(new LinkedList<>())
         .build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
     Assert.assertTrue("Server and/or shell PORTS for container with empty ports list",
         CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase));
 
@@ -486,7 +155,7 @@ public class V1ControllerTest {
     expectedPortsOverriding
         .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
 
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithPorts);
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithPorts);
     Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.",
         CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding));
 
@@ -500,8 +169,7 @@ public class V1ControllerTest {
     expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
     expectedPortsDebug.addAll(debugPorts);
 
-    v1ControllerWithPodTemplate.configureContainerPorts(
-        true, numInstances, inputContainerWithDebug);
+    STATEFUL_SET.configureContainerPorts(true, numInstances, inputContainerWithDebug);
     Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.",
         CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug));
   }
@@ -509,7 +177,7 @@ public class V1ControllerTest {
   @Test
   public void testConfigureContainerEnvVars() {
     final List<V1EnvVar> heronEnvVars =
-        Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+        Collections.unmodifiableList(STATEFUL_SET.getExecutorEnvVars());
     final V1EnvVar additionEnvVar = new V1EnvVar()
         .name("env-variable-to-be-kept")
         .valueFrom(new V1EnvVarSource()
@@ -529,9 +197,12 @@ public class V1ControllerTest {
         additionEnvVar
     );
 
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     // Null env vars. This is the default case.
     V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
+    STATEFUL_SET.configureContainerEnvVars(containerWithNullEnvVars);
     Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match",
         CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars));
 
@@ -539,7 +210,7 @@ public class V1ControllerTest {
     V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
         .withEnv(new LinkedList<>())
         .build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
+    STATEFUL_SET.configureContainerEnvVars(containerWithEmptyEnvVars);
     Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match",
         CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars));
 
@@ -549,13 +220,16 @@ public class V1ControllerTest {
     V1Container containerWithEnvVars = new V1ContainerBuilder()
         .withEnv(inputEnvVars)
         .build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
+    STATEFUL_SET.configureContainerEnvVars(containerWithEnvVars);
     Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden",
         CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding));
   }
 
   @Test
   public void testConfigureContainerResources() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final boolean isExecutor = true;
 
     final Resource resourceDefault = new Resource(
@@ -597,16 +271,16 @@ public class V1ControllerTest {
 
     // Default. Null resources.
     V1Container containerNull = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerNull, configNoLimit, resourceDefault, isExecutor);
+    STATEFUL_SET.configureContainerResources(containerNull, configNoLimit, resourceDefault,
+        isExecutor);
     Assert.assertTrue("Default LIMITS should be set in container with null LIMITS",
         containerNull.getResources().getLimits().entrySet()
             .containsAll(expectDefaultRequirements.getLimits().entrySet()));
 
     // Empty resources.
     V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerEmpty, configNoLimit, resourceDefault, isExecutor);
+    STATEFUL_SET.configureContainerResources(containerEmpty, configNoLimit, resourceDefault,
+        isExecutor);
     Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS",
         containerNull.getResources().getLimits().entrySet()
             .containsAll(expectDefaultRequirements.getLimits().entrySet()));
@@ -615,8 +289,8 @@ public class V1ControllerTest {
     V1Container containerCustom = new V1ContainerBuilder()
         .withResources(customRequirements)
         .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerCustom, configNoLimit, resourceDefault, isExecutor);
+    STATEFUL_SET.configureContainerResources(containerCustom, configNoLimit, resourceDefault,
+        isExecutor);
     Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS",
         containerCustom.getResources().getLimits().entrySet()
             .containsAll(expectCustomRequirements.getLimits().entrySet()));
@@ -625,8 +299,8 @@ public class V1ControllerTest {
     V1Container containerRequests = new V1ContainerBuilder()
         .withResources(customRequirements)
         .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerRequests, configWithLimit, resourceDefault, isExecutor);
+    STATEFUL_SET.configureContainerResources(containerRequests, configWithLimit, resourceDefault,
+        isExecutor);
     Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST",
         containerRequests.getResources().getLimits().entrySet()
             .containsAll(expectCustomRequirements.getLimits().entrySet()));
@@ -637,6 +311,9 @@ public class V1ControllerTest {
 
   @Test
   public void testConfigureContainerResourcesCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final boolean isExecutor = true;
     final String customLimitMEMStr = "120Gi";
     final String customLimitCPUStr = "5";
@@ -674,7 +351,7 @@ public class V1ControllerTest {
         .build();
 
     final V1Container actual = new V1Container();
-    v1ControllerWithPodTemplate.configureContainerResources(actual, config, resources, isExecutor);
+    STATEFUL_SET.configureContainerResources(actual, config, resources, isExecutor);
     Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
   }
 
@@ -686,7 +363,10 @@ public class V1ControllerTest {
         .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault)
         .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault)
         .build();
-    final V1Controller controllerWithMounts = new V1Controller(configWithVolumes, RUNTIME);
+    final StatefulSet.Configs configsWithVolumes = new StatefulSet.Configs(configWithVolumes,
+        RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+    final StatefulSet.Configs configsWithNoVolumes = new StatefulSet.Configs(
+        Config.newBuilder().build(), RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
     final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
         .withName(pathNameDefault)
         .withMountPath(pathDefault)
@@ -707,14 +387,17 @@ public class V1ControllerTest {
     );
 
     // No Volume Mounts set.
-    V1Controller controllerDoNotSetMounts = new V1Controller(Config.newBuilder().build(), RUNTIME);
+    STATEFUL_SET.setClusterConfigs(configsWithNoVolumes);
     V1Container containerNoSetMounts = new V1Container();
-    controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
+    STATEFUL_SET.mountVolumeIfPresent(containerNoSetMounts);
     Assert.assertNull(containerNoSetMounts.getVolumeMounts());
 
+    // Configure factory for volume mounts.
+    STATEFUL_SET.setClusterConfigs(configsWithVolumes);
+
     // Default. Null Volume Mounts.
     V1Container containerNull = new V1ContainerBuilder().build();
-    controllerWithMounts.mountVolumeIfPresent(containerNull);
+    STATEFUL_SET.mountVolumeIfPresent(containerNull);
     Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS",
         CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts()));
 
@@ -722,7 +405,7 @@ public class V1ControllerTest {
     V1Container containerEmpty = new V1ContainerBuilder()
         .withVolumeMounts(new LinkedList<>())
         .build();
-    controllerWithMounts.mountVolumeIfPresent(containerEmpty);
+    STATEFUL_SET.mountVolumeIfPresent(containerEmpty);
     Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS",
         CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts()));
 
@@ -730,20 +413,23 @@ public class V1ControllerTest {
     V1Container containerCustom = new V1ContainerBuilder()
         .withVolumeMounts(volumeMountsCustomList)
         .build();
-    controllerWithMounts.mountVolumeIfPresent(containerCustom);
+    STATEFUL_SET.mountVolumeIfPresent(containerCustom);
     Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS",
         CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts()));
   }
 
   @Test
   public void testConfigureTolerations() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final V1Toleration keptToleration = new V1Toleration()
         .key("kept toleration")
         .operator("Some Operator")
         .effect("Some Effect")
         .tolerationSeconds(5L);
     final List<V1Toleration> expectedTolerationBase =
-        Collections.unmodifiableList(V1Controller.getTolerations());
+        Collections.unmodifiableList(getTolerations());
     final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
         Arrays.asList(
             new V1Toleration()
@@ -756,7 +442,7 @@ public class V1ControllerTest {
 
     // Null Tolerations. This is the default case.
     final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
-    v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
+    STATEFUL_SET.configureTolerations(podSpecNullTolerations);
     Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults",
         CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
             expectedTolerationBase));
@@ -765,7 +451,7 @@ public class V1ControllerTest {
     final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
         .withTolerations(new LinkedList<>())
         .build();
-    v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
+    STATEFUL_SET.configureTolerations(podSpecWithEmptyTolerations);
     Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults",
         CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
             expectedTolerationBase));
@@ -778,7 +464,7 @@ public class V1ControllerTest {
         new LinkedList<>(expectedTolerationBase);
     expectedTolerationsOverriding.add(keptToleration);
 
-    v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
+    STATEFUL_SET.configureTolerations(podSpecWithTolerations);
     Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults",
         CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
             expectedTolerationsOverriding));
@@ -786,6 +472,9 @@ public class V1ControllerTest {
 
   @Test
   public void testCreatePersistentVolumeClaims() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String topologyName = "topology-name";
     final String volumeNameOne = "volume-name-one";
     final String volumeNameTwo = "volume-name-two";
@@ -838,7 +527,7 @@ public class V1ControllerTest {
     final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
         .withNewMetadata()
           .withName(volumeNameOne)
-          .withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+          .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
         .endMetadata()
         .withNewSpec()
           .withStorageClassName(storageClassName)
@@ -853,7 +542,7 @@ public class V1ControllerTest {
     final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
         .withNewMetadata()
           .withName(volumeNameStatic)
-          .withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+          .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
         .endMetadata()
         .withNewSpec()
           .withStorageClassName("")
@@ -869,7 +558,7 @@ public class V1ControllerTest {
         new LinkedList<>(Arrays.asList(claimOne, claimStatic));
 
     final List<V1PersistentVolumeClaim> actualClaims =
-        v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
+        STATEFUL_SET.createPersistentVolumeClaims(mapPVCOpts);
 
     Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
     Assert.assertTrue(expectedClaims.containsAll(actualClaims));
@@ -877,6 +566,9 @@ public class V1ControllerTest {
 
   @Test
   public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String volumeNameOne = "VolumeNameONE";
     final String volumeNameTwo = "VolumeNameTWO";
     final String claimNameOne = "claim-name-one";
@@ -919,8 +611,8 @@ public class V1ControllerTest {
     // Test case container.
     // Input: Map of Volume configurations.
     // Output: The expected lists of Volumes and Volume Mounts.
-    final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
-        Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
+    final List<KubernetesUtils.TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+          Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
 
     // Default case: No PVC provided.
     testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(),
@@ -939,7 +631,7 @@ public class V1ControllerTest {
              Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
       List<V1Volume> actualVolume = new LinkedList<>();
       List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
-      v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+      STATEFUL_SET.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
           actualVolume, actualVolumeMount);
 
       Assert.assertTrue(testCase.description,
@@ -951,6 +643,9 @@ public class V1ControllerTest {
 
   @Test
   public void testConfigurePodWithVolumesAndMountsFromCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String volumeNameClashing = "clashing-volume";
     final String volumeMountNameClashing = "original-volume-mount";
     V1Volume baseVolume = new V1VolumeBuilder()
@@ -1048,7 +743,7 @@ public class V1ControllerTest {
 
     // Testing loop.
     for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) {
-      v1ControllerWithPodTemplate
+      STATEFUL_SET
           .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0],
               (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2],
               (List<V1VolumeMount>) testCase.input[3]);
@@ -1062,6 +757,8 @@ public class V1ControllerTest {
 
   @Test
   public void testSetShardIdEnvironmentVariableCommand() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
 
     List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
 
@@ -1072,12 +769,15 @@ public class V1ControllerTest {
 
     for (TestTuple<Boolean, String> testCase : testCases) {
       Assert.assertEquals(testCase.description, testCase.expected,
-          v1ControllerWithPodTemplate.setShardIdEnvironmentVariableCommand(testCase.input));
+          STATEFUL_SET.setShardIdEnvironmentVariableCommand(testCase.input));
     }
   }
 
   @Test
   public void testCreateResourcesRequirement() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String managerCpuLimit = "3000m";
     final String managerMemLimit = "256Gi";
     final Quantity memory = Quantity.fromString(managerMemLimit);
@@ -1147,13 +847,16 @@ public class V1ControllerTest {
     // Test loop.
     for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) {
       Map<String, Quantity> actual =
-          v1ControllerPodTemplate.createResourcesRequirement(testCase.input);
+          STATEFUL_SET.createResourcesRequirement(testCase.input);
       Assert.assertEquals(testCase.description, testCase.expected, actual);
     }
   }
 
   @Test
   public void testCreateVolumeAndMountsEmptyDirCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String volumeName = "volume-name-empty-dir";
     final String medium = "Memory";
     final String sizeLimit = "1Gi";
@@ -1189,13 +892,16 @@ public class V1ControllerTest {
 
     List<V1Volume> actualVolumes = new LinkedList<>();
     List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
+    STATEFUL_SET.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
     Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes);
     Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts);
   }
 
   @Test
   public void testCreateVolumeAndMountsHostPathCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String volumeName = "volume-name-host-path";
     final String type = "DirectoryOrCreate";
     final String pathOnHost = "path.on.host";
@@ -1231,13 +937,16 @@ public class V1ControllerTest {
 
     List<V1Volume> actualVolumes = new LinkedList<>();
     List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
+    STATEFUL_SET.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
     Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes);
     Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts);
   }
 
   @Test
   public void testCreateVolumeAndMountsNFSCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
     final String volumeName = "volume-name-nfs";
     final String server = "nfs.server.address";
     final String pathOnNFS = "path.on.host";
@@ -1277,7 +986,7 @@ public class V1ControllerTest {
 
     List<V1Volume> actualVolumes = new LinkedList<>();
     List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
+    STATEFUL_SET.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
     Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes);
     Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts);
   }
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 22d3088b598..9becf8b8d7b 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -244,7 +244,7 @@ public class VolumesTests {
     final String volumeMode = "VolumeMode";
     final String path = "/path/to/mount/";
     final String subPath = "/sub/path/to/mount/";
-    final Map<String, String> labels = V1Controller.getPersistentVolumeClaimLabels(topologyName);
+    final Map<String, String> labels = KubernetesShim.getPersistentVolumeClaimLabels(topologyName);
 
     final Map<KubernetesConstants.VolumeConfigKeys, String> volOneConfig =
         new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {