You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/21 03:25:50 UTC

[incubator-heron] branch saadurrahman/3846-Refactoring-K8s-Shim-dev updated (23ded815982 -> 8e67ad10ced)

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a change to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


    from 23ded815982 [Tests] Shim to Stateful Set test migration.
     new 59dbacdb408 [StatefulSet] added javadoc for Configs constructor.
     new 6722ee7bf8d [KubernetesShim] wired in Stateful Set factory
     new abfa1378105 [StatefulSet] removed references to functions in K8s Shim.
     new e73994e51ce [Tests] Kubernetes Utils referencing methods from Stateful Set factory.
     new 8e67ad10ced [KubernetesShim] removing unneeded methods that were relocated to Stateful Set factory.

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../heron/scheduler/kubernetes/KubernetesShim.java | 734 +----------------
 .../heron/scheduler/kubernetes/StatefulSet.java    |   8 +
 .../scheduler/kubernetes/KubernetesShimTest.java   | 866 ---------------------
 .../scheduler/kubernetes/KubernetesUtilsTest.java  |   4 +-
 .../scheduler/kubernetes/StatefulSetTest.java      |   7 +-
 5 files changed, 26 insertions(+), 1593 deletions(-)


[incubator-heron] 03/05: [StatefulSet] removed references to functions in K8s Shim.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit abfa1378105798f984c9ac13f703bb33cfbf7697
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 23:23:51 2022 -0400

    [StatefulSet] removed references to functions in K8s Shim.
---
 .../org/apache/heron/scheduler/kubernetes/StatefulSetTest.java     | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
index 10f48845b5f..d58c42102c1 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
@@ -63,6 +63,7 @@ import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
 import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
 import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+import static org.apache.heron.scheduler.kubernetes.StatefulSet.getTolerations;
 
 @RunWith(MockitoJUnitRunner.class)
 public class StatefulSetTest {
@@ -428,7 +429,7 @@ public class StatefulSetTest {
         .effect("Some Effect")
         .tolerationSeconds(5L);
     final List<V1Toleration> expectedTolerationBase =
-        Collections.unmodifiableList(KubernetesShim.getTolerations());
+        Collections.unmodifiableList(getTolerations());
     final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
         Arrays.asList(
             new V1Toleration()
@@ -526,7 +527,7 @@ public class StatefulSetTest {
     final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
         .withNewMetadata()
           .withName(volumeNameOne)
-          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+          .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
         .endMetadata()
         .withNewSpec()
           .withStorageClassName(storageClassName)
@@ -541,7 +542,7 @@ public class StatefulSetTest {
     final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
         .withNewMetadata()
           .withName(volumeNameStatic)
-          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+          .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
         .endMetadata()
         .withNewSpec()
           .withStorageClassName("")


[incubator-heron] 05/05: [KubernetesShim] removing unneeded methods that were relocated to Stateful Set factory.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 8e67ad10cedf217cdec43ec3c22bad445ae28910
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 23:25:18 2022 -0400

    [KubernetesShim] removing unneeded methods that were relocated to Stateful Set factory.
---
 .../heron/scheduler/kubernetes/KubernetesShim.java | 720 -----------------
 .../scheduler/kubernetes/KubernetesShimTest.java   | 866 ---------------------
 2 files changed, 1586 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
index c122ff5cda3..de1f4764fb6 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -22,8 +22,6 @@ package org.apache.heron.scheduler.kubernetes;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -48,7 +46,6 @@ import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
 
-import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.custom.V1Patch;
 import io.kubernetes.client.openapi.ApiClient;
 import io.kubernetes.client.openapi.ApiException;
@@ -56,28 +53,18 @@ import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.AppsV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1Container;
-import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
 import io.kubernetes.client.openapi.models.V1EnvVarSource;
-import io.kubernetes.client.openapi.models.V1LabelSelector;
 import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
-import io.kubernetes.client.openapi.models.V1PodSpec;
 import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
-import io.kubernetes.client.openapi.models.V1ResourceRequirements;
-import io.kubernetes.client.openapi.models.V1SecretKeySelector;
-import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
 import io.kubernetes.client.openapi.models.V1Service;
 import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Status;
 import io.kubernetes.client.openapi.models.V1Toleration;
-import io.kubernetes.client.openapi.models.V1Volume;
-import io.kubernetes.client.openapi.models.V1VolumeMount;
 import io.kubernetes.client.util.PatchUtils;
 import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
@@ -383,60 +370,6 @@ public class KubernetesShim extends KubernetesController {
             + "] in namespace [" + getNamespace() + "] is deleted.");
   }
 
-  /**
-   * Generates the command to start Heron within the <code>container</code>.
-   * @param containerId Passed down to <>SchedulerUtils</> to generate executor command.
-   * @param numOfInstances Used to configure the debugging ports.
-   * @param isExecutor Flag used to generate the correct <code>shard_id</code>.
-   * @return The complete command to start Heron in a <code>container</code>.
-   */
-  protected List<String> getExecutorCommand(String containerId, int numOfInstances,
-                                            boolean isExecutor) {
-    final Config configuration = getConfiguration();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
-    final Map<ExecutorPort, String> ports =
-        KubernetesConstants.EXECUTOR_PORTS.entrySet()
-            .stream()
-            .collect(Collectors.toMap(Map.Entry::getKey,
-                e -> e.getValue().toString()));
-
-    if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))
-            && numOfInstances != 0) {
-      List<String> remoteDebuggingPorts = new LinkedList<>();
-      IntStream.range(0, numOfInstances).forEach(i -> {
-        int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i;
-        remoteDebuggingPorts.add(String.valueOf(port));
-      });
-      ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
-              String.join(",", remoteDebuggingPorts));
-    }
-
-    final String[] executorCommand =
-        SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
-            containerId, ports);
-    return Arrays.asList(
-        "sh",
-        "-c",
-        KubernetesUtils.getConfCommand(configuration)
-            + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration)
-            + " && " + setShardIdEnvironmentVariableCommand(isExecutor)
-            + " && " + String.join(" ", executorCommand)
-    );
-  }
-
-  /**
-   * Configures the <code>shard_id</code> for the Heron container based on whether it is an <code>executor</code>
-   * or <code>manager</code>. <code>executor</code> IDs are [1 - n) and the <code>manager</code> IDs start at 0.
-   * @param isExecutor Switch flag to generate correct command.
-   * @return The command required to put the Heron instance in <code>executor</code> or <code>manager</code> mode.
-   */
-  @VisibleForTesting
-  protected static String setShardIdEnvironmentVariableCommand(boolean isExecutor) {
-    final String pattern = String.format("%%s=%s && echo shardId=${%%s}",
-        isExecutor ? "$((${POD_NAME##*-} + 1))" : "${POD_NAME##*-}");
-    return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
-  }
-
   /**
    * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
    * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
@@ -463,92 +396,6 @@ public class KubernetesShim extends KubernetesController {
     return service;
   }
 
-  /**
-   * Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in.
-   * @param containerResource Passed down to configure the <code>executor</code> resource limits.
-   * @param numberOfInstances Used to configure the execution command and ports for the <code>executor</code>.
-   * @param isExecutor Flag used to configure components specific to <code>executor</code> and <code>manager</code>.
-   * @return A fully configured <code>V1StatefulSet</code> for the topology's <code>executors</code>.
-   */
-  private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances,
-                                          boolean isExecutor) {
-    final String topologyName = getTopologyName();
-    final Config runtimeConfiguration = getRuntimeConfiguration();
-
-    final List<V1Volume> volumes = new LinkedList<>();
-    final List<V1VolumeMount> volumeMounts = new LinkedList<>();
-
-    // Collect Persistent Volume Claim configurations from the CLI.
-    final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configsPVC =
-        KubernetesContext.getVolumeClaimTemplates(getConfiguration(), isExecutor);
-
-    // Collect all Volume configurations from the CLI and generate Volumes and Volume Mounts.
-    createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, volumeMounts);
-    createVolumeAndMountsHostPathCLI(
-        KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), volumes, volumeMounts);
-    createVolumeAndMountsEmptyDirCLI(
-        KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), volumes, volumeMounts);
-    createVolumeAndMountsNFSCLI(
-        KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), volumes, volumeMounts);
-
-    final V1StatefulSet statefulSet = new V1StatefulSet();
-
-    // Setup StatefulSet's metadata.
-    final V1ObjectMeta objectMeta = new V1ObjectMeta()
-        .name(getStatefulSetName(isExecutor))
-        .labels(getPodLabels(topologyName));
-    statefulSet.setMetadata(objectMeta);
-
-    // Create the StatefulSet Spec.
-    // Reduce replica count by one for Executors and set to one for Manager.
-    final int replicasCount =
-        isExecutor ? Runtime.numContainers(runtimeConfiguration).intValue() - 1 : 1;
-    final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec()
-        .serviceName(topologyName)
-        .replicas(replicasCount);
-
-    // Parallel pod management tells the StatefulSet controller to launch or terminate
-    // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely
-    // terminated prior to launching or terminating another Pod.
-    statefulSetSpec.setPodManagementPolicy("Parallel");
-
-    // Add selector match labels "app=heron" and "topology=topology-name"
-    // so we know which pods to manage.
-    final V1LabelSelector selector = new V1LabelSelector()
-        .matchLabels(getPodMatchLabels(topologyName));
-    statefulSetSpec.setSelector(selector);
-
-    // Create a Pod Template.
-    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
-
-    // Set up Pod Metadata.
-    final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName));
-    Map<String, String> annotations = new HashMap<>();
-    annotations.putAll(getPodAnnotations());
-    annotations.putAll(getPrometheusAnnotations());
-    templateMetaData.setAnnotations(annotations);
-    podTemplateSpec.setMetadata(templateMetaData);
-
-    configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, isExecutor,
-        volumes, volumeMounts);
-
-    statefulSetSpec.setTemplate(podTemplateSpec);
-
-    statefulSet.setSpec(statefulSetSpec);
-
-    statefulSetSpec.setVolumeClaimTemplates(createPersistentVolumeClaims(configsPVC));
-
-    return statefulSet;
-  }
-
-  /**
-   * Extracts general Pod <code>Annotation</code>s from configurations.
-   * @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod.
-   */
-  private Map<String, String> getPodAnnotations() {
-    return KubernetesContext.getPodAnnotations(getConfiguration());
-  }
-
   /**
    * Extracts <code>Service Annotations</code> for configurations.
    * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
@@ -557,19 +404,6 @@ public class KubernetesShim extends KubernetesController {
     return KubernetesContext.getServiceAnnotations(getConfiguration());
   }
 
-  /**
-   * Generates <code>Label</code>s to indicate Prometheus scraping and the exposed port.
-   * @return Key-value pairs of Prometheus <code>Annotation</code>s to be added to the Pod.
-   */
-  private Map<String, String> getPrometheusAnnotations() {
-    final Map<String, String> annotations = new HashMap<>();
-    annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
-    annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT,
-        KubernetesConstants.PROMETHEUS_PORT);
-
-    return annotations;
-  }
-
   /**
    * Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s.
    * @param topologyName Name of the <code>topology</code>.
@@ -582,20 +416,6 @@ public class KubernetesShim extends KubernetesController {
     return labels;
   }
 
-  /**
-   * Extracts <code>Label</code>s from configurations, generates the <code>heron</code> and
-   * <code>topology</code> name <code>Label</code>s.
-   * @param topologyName Name of the <code>topology</code>.
-   * @return Key-value pairs of <code>Label</code>s to be added to the Pod.
-   */
-  private Map<String, String> getPodLabels(String topologyName) {
-    final Map<String, String> labels = new HashMap<>();
-    labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
-    labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
-    labels.putAll(KubernetesContext.getPodLabels(getConfiguration()));
-    return labels;
-  }
-
   /**
    * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
    * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
@@ -604,401 +424,6 @@ public class KubernetesShim extends KubernetesController {
     return KubernetesContext.getServiceLabels(getConfiguration());
   }
 
-  /**
-   * Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container
-   * will be configured to allow it to function but other supplied containers are loaded verbatim.
-   * @param podTemplateSpec The <code>Pod Template Spec</code> section to update.
-   * @param resource Passed down to configure the resource limits.
-   * @param numberOfInstances Passed down to configure the ports.
-   * @param isExecutor Flag used to configure components specific to <code>Executor</code> and <code>Manager</code>.
-   * @param volumes <code>Volumes</code> generated from configurations options.
-   * @param volumeMounts <code>Volume Mounts</code> generated from configurations options.
-   */
-  private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec, Resource resource,
-      int numberOfInstances, boolean isExecutor, List<V1Volume> volumes,
-      List<V1VolumeMount> volumeMounts) {
-    if (podTemplateSpec.getSpec() == null) {
-      podTemplateSpec.setSpec(new V1PodSpec());
-    }
-    final V1PodSpec podSpec = podTemplateSpec.getSpec();
-
-    // Set the termination period to 0 so pods can be deleted quickly
-    podSpec.setTerminationGracePeriodSeconds(0L);
-
-    // Set the pod tolerations so pods are rescheduled when nodes go down
-    // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
-    configureTolerations(podSpec);
-
-    // Get <Heron> container and ignore all others.
-    final String containerName =
-        isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME;
-    V1Container heronContainer = null;
-    List<V1Container> containers = podSpec.getContainers();
-    if (containers != null) {
-      for (V1Container container : containers) {
-        final String name = container.getName();
-        if (name != null && name.equals(containerName)) {
-          if (heronContainer != null) {
-            throw new TopologySubmissionException(
-                String.format("Multiple configurations found for '%s' container", containerName));
-          }
-          heronContainer = container;
-        }
-      }
-    } else {
-      containers = new LinkedList<>();
-    }
-
-    if (heronContainer == null) {
-      heronContainer = new V1Container().name(containerName);
-      containers.add(heronContainer);
-    }
-
-    if (!volumes.isEmpty() || !volumeMounts.isEmpty()) {
-      configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes,
-          volumeMounts);
-    }
-
-    configureHeronContainer(resource, numberOfInstances, heronContainer, isExecutor);
-
-    podSpec.setContainers(containers);
-
-    mountSecretsAsVolumes(podSpec);
-  }
-
-  /**
-   * Adds <code>tolerations</code> to the <code>Pod Spec</code> with Heron's values taking precedence.
-   * @param spec <code>Pod Spec</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureTolerations(final V1PodSpec spec) {
-    KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
-    spec.setTolerations(
-        utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
-            Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations")
-    );
-  }
-
-  /**
-   * Generates a list of <code>tolerations</code> which Heron requires.
-   * @return A list of configured <code>tolerations</code>.
-   */
-  @VisibleForTesting
-  protected static List<V1Toleration> getTolerations() {
-    final List<V1Toleration> tolerations = new ArrayList<>();
-    KubernetesConstants.TOLERATIONS.forEach(t -> {
-      final V1Toleration toleration =
-          new V1Toleration()
-              .key(t)
-              .operator("Exists")
-              .effect("NoExecute")
-              .tolerationSeconds(10L);
-      tolerations.add(toleration);
-    });
-
-    return tolerations;
-  }
-
-  /**
-   * Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod.
-   * @param podSpec <code>Pod Spec</code> to add secrets to.
-   */
-  private void mountSecretsAsVolumes(V1PodSpec podSpec) {
-    final Config config = getConfiguration();
-    final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
-    for (Map.Entry<String, String> secret : secrets.entrySet()) {
-      final V1VolumeMount mount = new V1VolumeMount()
-              .name(secret.getKey())
-              .mountPath(secret.getValue());
-      final V1Volume secretVolume = new V1Volume()
-              .name(secret.getKey())
-              .secret(new V1SecretVolumeSourceBuilder()
-                      .withSecretName(secret.getKey())
-                      .build());
-      podSpec.addVolumesItem(secretVolume);
-      for (V1Container container : podSpec.getContainers()) {
-        container.addVolumeMountsItem(mount);
-      }
-    }
-  }
-
-  /**
-   * Configures the <code>Heron</code> container with values for parameters Heron requires for functioning.
-   * @param resource Resource limits.
-   * @param numberOfInstances Required number of <code>executor</code> containers which is used to configure ports.
-   * @param container The <code>executor</code> container to be configured.
-   * @param isExecutor Flag indicating whether to set a <code>executor</code> or <code>manager</code> command.
-   */
-  private void configureHeronContainer(Resource resource, int numberOfInstances,
-                                       final V1Container container, boolean isExecutor) {
-    final Config configuration = getConfiguration();
-
-    // Set up the container images.
-    container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
-
-    // Set up the container command.
-    final List<String> command =
-        getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances, isExecutor);
-    container.setCommand(command);
-
-    if (KubernetesContext.hasImagePullPolicy(configuration)) {
-      container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration));
-    }
-
-    // Configure environment variables.
-    configureContainerEnvVars(container);
-
-    // Set secret keys.
-    setSecretKeyRefs(container);
-
-    // Set container resources
-    configureContainerResources(container, configuration, resource, isExecutor);
-
-    // Set container ports.
-    final boolean debuggingEnabled =
-        TopologyUtils.getTopologyRemoteDebuggingEnabled(
-            Runtime.topology(getRuntimeConfiguration()));
-    configureContainerPorts(debuggingEnabled, numberOfInstances, container);
-
-    // setup volume mounts
-    mountVolumeIfPresent(container);
-  }
-
-  /**
-   * Configures the resources in the <code>container</code> with values in the <code>config</code> taking precedence.
-   * @param container The <code>container</code> to be configured.
-   * @param configuration The <code>Config</code> object to check if a resource request needs to be set.
-   * @param resource User defined resources limits from input.
-   * @param isExecutor Flag to indicate configuration for an <code>executor</code> or <code>manager</code>.
-   */
-  @VisibleForTesting
-  protected void configureContainerResources(final V1Container container,
-                                             final Config configuration, final Resource resource,
-                                             boolean isExecutor) {
-    if (container.getResources() == null) {
-      container.setResources(new V1ResourceRequirements());
-    }
-    final V1ResourceRequirements resourceRequirements = container.getResources();
-
-    // Collect Limits and Requests from CLI.
-    final Map<String, Quantity> limitsCLI = createResourcesRequirement(
-        KubernetesContext.getResourceLimits(configuration, isExecutor));
-    final Map<String, Quantity> requestsCLI = createResourcesRequirement(
-        KubernetesContext.getResourceRequests(configuration, isExecutor));
-
-    if (resourceRequirements.getLimits() == null) {
-      resourceRequirements.setLimits(new HashMap<>());
-    }
-
-    // Set Limits and Resources from CLI <if> available, <else> use Configs. Deduplicate on name
-    // with precedence [1] CLI, [2] Config.
-    final Map<String, Quantity> limits = resourceRequirements.getLimits();
-    final Quantity limitCPU = limitsCLI.getOrDefault(KubernetesConstants.CPU,
-        Quantity.fromString(Double.toString(KubernetesUtils.roundDecimal(resource.getCpu(), 3))));
-    final Quantity limitMEMORY = limitsCLI.getOrDefault(KubernetesConstants.MEMORY,
-        Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
-
-    limits.put(KubernetesConstants.MEMORY, limitMEMORY);
-    limits.put(KubernetesConstants.CPU, limitCPU);
-
-    // Set the Kubernetes container resource request.
-    // Order: [1] CLI, [2] EQUAL_TO_LIMIT, [3] NOT_SET
-    KubernetesContext.KubernetesResourceRequestMode requestMode =
-        KubernetesContext.getKubernetesRequestMode(configuration);
-    if (!requestsCLI.isEmpty()) {
-      if (resourceRequirements.getRequests() == null) {
-        resourceRequirements.setRequests(new HashMap<>());
-      }
-      final Map<String, Quantity> requests = resourceRequirements.getRequests();
-
-      if (requestsCLI.containsKey(KubernetesConstants.MEMORY)) {
-        requests.put(KubernetesConstants.MEMORY, requestsCLI.get(KubernetesConstants.MEMORY));
-      }
-      if (requestsCLI.containsKey(KubernetesConstants.CPU)) {
-        requests.put(KubernetesConstants.CPU, requestsCLI.get(KubernetesConstants.CPU));
-      }
-    } else if (requestMode == KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
-      LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
-      resourceRequirements.setRequests(limits);
-    } else {
-      LOG.log(Level.CONFIG, "Not setting K8s request because config was NOT_SET");
-    }
-    container.setResources(resourceRequirements);
-  }
-
-  /**
-   * Creates <code>Resource Requirements</code> from a Map of <code>Config</code> items for <code>CPU</code>
-   * and <code>Memory</code>.
-   * @param configs <code>Configs</code> to be parsed for configuration.
-   * @return Configured <code>Resource Requirements</code>. An <code>empty</code> map will be returned
-   * if there are no <code>configs</code>.
-   */
-  @VisibleForTesting
-  protected Map<String, Quantity> createResourcesRequirement(Map<String, String> configs) {
-    final Map<String, Quantity> requirements = new HashMap<>();
-
-    if (configs == null || configs.isEmpty()) {
-      return requirements;
-    }
-
-    final String memoryLimit = configs.get(KubernetesConstants.MEMORY);
-    if (memoryLimit != null && !memoryLimit.isEmpty()) {
-      requirements.put(KubernetesConstants.MEMORY, Quantity.fromString(memoryLimit));
-    }
-    final String cpuLimit = configs.get(KubernetesConstants.CPU);
-    if (cpuLimit != null && !cpuLimit.isEmpty()) {
-      requirements.put(KubernetesConstants.CPU, Quantity.fromString(cpuLimit));
-    }
-
-    return requirements;
-  }
-
-  /**
-   * Configures the environment variables in the <code>container</code> with those Heron requires.
-   * Heron's values take precedence.
-   * @param container The <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureContainerEnvVars(final V1Container container) {
-    // Deduplicate on var name with Heron defaults take precedence.
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>();
-    container.setEnv(
-        utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
-          Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables")
-    );
-  }
-
-  /**
-   * Generates a list of <code>Environment Variables</code> required by Heron to function.
-   * @return A list of configured <code>Environment Variables</code> required by Heron to function.
-   */
-  @VisibleForTesting
-  protected static List<V1EnvVar> getExecutorEnvVars() {
-    final V1EnvVar envVarHost = new V1EnvVar();
-    envVarHost.name(KubernetesConstants.ENV_HOST)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_IP)));
-
-    final V1EnvVar envVarPodName = new V1EnvVar();
-    envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath(KubernetesConstants.POD_NAME)));
-
-    return Arrays.asList(envVarHost, envVarPodName);
-  }
-
-  /**
-   * Configures the ports in the <code>container</code> with those Heron requires. Heron's values take precedence.
-   * @param remoteDebugEnabled Flag used to indicate if debugging ports need to be added.
-   * @param numberOfInstances The number of debugging ports to be opened.
-   * @param container <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void configureContainerPorts(boolean remoteDebugEnabled, int numberOfInstances,
-                                         final V1Container container) {
-    List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts());
-
-    if (remoteDebugEnabled) {
-      ports.addAll(getDebuggingPorts(numberOfInstances));
-    }
-
-    // Set container ports. Deduplicate using port number with Heron defaults taking precedence.
-    KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
-    container.setPorts(
-        utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
-            Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports")
-    );
-  }
-
-  /**
-   * Generates a list of <code>ports</code> required by Heron to function.
-   * @return A list of configured <code>ports</code> required by Heron to function.
-   */
-  @VisibleForTesting
-  protected static List<V1ContainerPort> getExecutorPorts() {
-    List<V1ContainerPort> ports = new LinkedList<>();
-    KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
-      final V1ContainerPort port = new V1ContainerPort()
-          .name(p.getName())
-          .containerPort(v);
-      ports.add(port);
-    });
-    return ports;
-  }
-
-  /**
-   * Generate the debugging ports required by Heron.
-   * @param numberOfInstances The number of debugging ports to generate.
-   * @return A list of configured debugging <code>ports</code>.
-   */
-  @VisibleForTesting
-  protected static List<V1ContainerPort> getDebuggingPorts(int numberOfInstances) {
-    List<V1ContainerPort> ports = new LinkedList<>();
-    IntStream.range(0, numberOfInstances).forEach(i -> {
-      final String portName =
-          KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
-      final V1ContainerPort port = new V1ContainerPort()
-          .name(portName)
-          .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);
-      ports.add(port);
-    });
-    return ports;
-  }
-
-  /**
-   * Adds volume mounts to the <code>container</code> that Heron requires. Heron's values taking precedence.
-   * @param container <code>container</code> to be configured.
-   */
-  @VisibleForTesting
-  protected void mountVolumeIfPresent(final V1Container container) {
-    final Config config = getConfiguration();
-    if (KubernetesContext.hasContainerVolume(config)) {
-      final V1VolumeMount mount =
-          new V1VolumeMount()
-              .name(KubernetesContext.getContainerVolumeName(config))
-              .mountPath(KubernetesContext.getContainerVolumeMountPath(config));
-
-      // Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence.
-      KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
-          new KubernetesUtils.V1ControllerUtils<>();
-      container.setVolumeMounts(
-          utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(),
-              Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts")
-      );
-    }
-  }
-
-  /**
-   * Adds <code>Secret Key</code> references to a <code>container</code>.
-   * @param container <code>container</code> to be configured.
-   */
-  private void setSecretKeyRefs(V1Container container) {
-    final Config config = getConfiguration();
-    final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
-    for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
-      final String[] keyRefParts = secret.getValue().split(":");
-      if (keyRefParts.length != 2) {
-        LOG.log(Level.SEVERE,
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
-        throw new TopologyRuntimeManagementException(
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
-      }
-      String name = keyRefParts[0];
-      String key = keyRefParts[1];
-      final V1EnvVar envVar = new V1EnvVar()
-              .name(secret.getKey())
-                .valueFrom(new V1EnvVarSource()
-                  .secretKeyRef(new V1SecretKeySelector()
-                          .key(key)
-                          .name(name)));
-      container.addEnvItem(envVar);
-    }
-  }
-
   /**
    * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
    * The loaded text is then parsed into a usable <code>Pod Template</code>.
@@ -1106,151 +531,6 @@ public class KubernetesShim extends KubernetesController {
     }
   }
 
-  /**
-   * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
-   * to <code>key-value</code> pairs of configuration options and values.
-   * @param mapOfOpts <code>Volume</code> to configuration <code>key-value</code> mappings.
-   * @return Fully populated list of only dynamically backed <code>Persistent Volume Claims</code>.
-   */
-  @VisibleForTesting
-  protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts) {
-
-    List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
-
-    // Iterate over all the PVC Volumes.
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> pvc
-        : mapOfOpts.entrySet()) {
-
-      // Only create claims for `OnDemand` volumes.
-      final String claimName = pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName);
-      if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        continue;
-      }
-
-      listOfPVCs.add(Volumes.get()
-          .createPersistentVolumeClaim(pvc.getKey(),
-              getPersistentVolumeClaimLabels(getTopologyName()), pvc.getValue()));
-    }
-    return listOfPVCs;
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Persistent Volume Claims</code>s
-   *  to be placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
-   * @param mapConfig Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsPersistentVolumeClaimCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
-        : mapConfig.entrySet()) {
-      final String volumeName = configs.getKey();
-
-      // Do not create Volumes for `OnDemand`.
-      final String claimName = configs.getValue()
-          .get(KubernetesConstants.VolumeConfigKeys.claimName);
-      if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
-        volumes.add(Volumes.get().createPersistentVolumeClaim(claimName, volumeName));
-      }
-      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>emptyDir</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsEmptyDirCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.EmptyDir, volumeName, configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Host Path</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsHostPathCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.HostPath, volumeName, configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
-    }
-  }
-
-  /**
-   * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>NFS</code>s to be
-   * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
-   * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
-   * @param volumes A list of <code>Volume</code> to append to.
-   * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
-   */
-  @VisibleForTesting
-  protected void createVolumeAndMountsNFSCLI(
-      final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
-      final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
-    for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
-        : mapOfOpts.entrySet()) {
-      final String volumeName = configs.getKey();
-      final V1Volume volume = Volumes.get()
-          .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, configs.getValue());
-      volumes.add(volume);
-      volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
-    }
-  }
-
-  /**
-   * Configures the Pod Spec and Heron container with <code>Volumes</code> and <code>Volume Mounts</code>.
-   * @param podSpec All generated <code>V1Volume</code> will be placed in the <code>Pod Spec</code>.
-   * @param executor All generated <code>V1VolumeMount</code> will be placed in the <code>Container</code>.
-   * @param volumes <code>Volumes</code> to be inserted in the Pod Spec.
-   * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the Heron container.
-   */
-  @VisibleForTesting
-  protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec podSpec,
-      final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
-
-    // Deduplicate on Names with Persistent Volume Claims taking precedence.
-
-    KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
-        new KubernetesUtils.V1ControllerUtils<>();
-    podSpec.setVolumes(
-        utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
-            Comparator.comparing(V1Volume::getName),
-            "Pod with Volumes"));
-
-    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
-        new KubernetesUtils.V1ControllerUtils<>();
-    executor.setVolumeMounts(
-        utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
-            Comparator.comparing(V1VolumeMount::getName),
-            "Heron container with Volume Mounts"));
-  }
-
   /**
    * Removes all Persistent Volume Claims associated with a specific topology, if they exist.
    * It looks for the following:
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
index d713d823466..ec1be4e87a9 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
@@ -19,14 +19,8 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
-
-import com.google.common.collect.ImmutableMap;
 
 import org.junit.Assert;
 import org.junit.Rule;
@@ -36,37 +30,16 @@ import org.junit.runner.RunWith;
 import org.mockito.Spy;
 import org.mockito.runners.MockitoJUnitRunner;
 
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.heron.common.basics.ByteAmount;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.common.Key;
-import org.apache.heron.spi.packing.Resource;
 
-import io.kubernetes.client.custom.Quantity;
 import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
-import io.kubernetes.client.openapi.models.V1Container;
-import io.kubernetes.client.openapi.models.V1ContainerBuilder;
-import io.kubernetes.client.openapi.models.V1ContainerPort;
-import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1EnvVarSource;
-import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
-import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
-import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
-import io.kubernetes.client.openapi.models.V1ResourceRequirements;
-import io.kubernetes.client.openapi.models.V1Toleration;
-import io.kubernetes.client.openapi.models.V1Volume;
-import io.kubernetes.client.openapi.models.V1VolumeBuilder;
-import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
-import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 
@@ -443,843 +416,4 @@ public class KubernetesShimTest {
     KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
     kubernetesShim.getPodTemplateLocation(true);
   }
-
-  @Test
-  public void testConfigureContainerPorts() {
-    final String portNamekept = "random-port-to-be-kept";
-    final int portNumberkept = 1111;
-    final int numInstances = 3;
-    final List<V1ContainerPort> expectedPortsBase =
-        Collections.unmodifiableList(KubernetesShim.getExecutorPorts());
-    final List<V1ContainerPort> debugPorts =
-        Collections.unmodifiableList(KubernetesShim.getDebuggingPorts(numInstances));
-    final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
-        Arrays.asList(
-            new V1ContainerPort()
-                .name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
-            new V1ContainerPort()
-                .name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
-            new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
-        )
-    );
-
-    // Null ports. This is the default case.
-    final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithNullPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container with null ports list",
-        CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase));
-
-    // Empty ports.
-    final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
-        .withPorts(new LinkedList<>())
-        .build();
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container with empty ports list",
-        CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase));
-
-    // Port overriding.
-    final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
-    final V1Container inputContainerWithPorts = new V1ContainerBuilder()
-        .withPorts(inputPorts)
-        .build();
-    final List<V1ContainerPort> expectedPortsOverriding = new LinkedList<>(expectedPortsBase);
-    expectedPortsOverriding
-        .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
-
-    v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithPorts);
-    Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.",
-        CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding));
-
-    // Port overriding with debug ports.
-    final List<V1ContainerPort> inputPortsWithDebug = new LinkedList<>(debugPorts);
-    inputPortsWithDebug.addAll(inputPortsBase);
-    final V1Container inputContainerWithDebug = new V1ContainerBuilder()
-        .withPorts(inputPortsWithDebug)
-        .build();
-    final List<V1ContainerPort> expectedPortsDebug = new LinkedList<>(expectedPortsBase);
-    expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
-    expectedPortsDebug.addAll(debugPorts);
-
-    v1ControllerWithPodTemplate.configureContainerPorts(
-        true, numInstances, inputContainerWithDebug);
-    Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.",
-        CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug));
-  }
-
-  @Test
-  public void testConfigureContainerEnvVars() {
-    final List<V1EnvVar> heronEnvVars =
-        Collections.unmodifiableList(KubernetesShim.getExecutorEnvVars());
-    final V1EnvVar additionEnvVar = new V1EnvVar()
-        .name("env-variable-to-be-kept")
-        .valueFrom(new V1EnvVarSource()
-            .fieldRef(new V1ObjectFieldSelector()
-                .fieldPath("env-variable-was-kept")));
-    final List<V1EnvVar> inputEnvVars = Arrays.asList(
-        new V1EnvVar()
-            .name(KubernetesConstants.ENV_HOST)
-            .valueFrom(new V1EnvVarSource()
-                .fieldRef(new V1ObjectFieldSelector()
-                    .fieldPath("env-host-to-be-replaced"))),
-        new V1EnvVar()
-            .name(KubernetesConstants.ENV_POD_NAME)
-            .valueFrom(new V1EnvVarSource()
-                .fieldRef(new V1ObjectFieldSelector()
-                    .fieldPath("pod-name-to-be-replaced"))),
-        additionEnvVar
-    );
-
-    // Null env vars. This is the default case.
-    V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match",
-        CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars));
-
-    // Empty env vars.
-    V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
-        .withEnv(new LinkedList<>())
-        .build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match",
-        CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars));
-
-    // Env Var overriding.
-    final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
-    expectedOverriding.add(additionEnvVar);
-    V1Container containerWithEnvVars = new V1ContainerBuilder()
-        .withEnv(inputEnvVars)
-        .build();
-    v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
-    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden",
-        CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding));
-  }
-
-  @Test
-  public void testConfigureContainerResources() {
-    final boolean isExecutor = true;
-
-    final Resource resourceDefault = new Resource(
-        9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000));
-    final Resource resourceCustom = new Resource(
-        4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000));
-
-    final Quantity defaultRAM = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceDefault.getRam()));
-    final Quantity defaultCPU = Quantity.fromString(
-        Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3)));
-    final Quantity customRAM = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceCustom.getRam()));
-    final Quantity customCPU = Quantity.fromString(
-        Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3)));
-    final Quantity customDisk = Quantity.fromString(
-        KubernetesUtils.Megabytes(resourceCustom.getDisk()));
-
-    final Config configNoLimit = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
-        .build();
-    final Config configWithLimit = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
-        .build();
-
-    final V1ResourceRequirements expectDefaultRequirements = new V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
-        .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
-
-    final V1ResourceRequirements expectCustomRequirements = new V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
-        .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
-        .putLimitsItem("disk", customDisk);
-
-    final V1ResourceRequirements customRequirements = new V1ResourceRequirements()
-        .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
-        .putLimitsItem(KubernetesConstants.CPU, customCPU)
-        .putLimitsItem("disk", customDisk);
-
-    // Default. Null resources.
-    V1Container containerNull = new V1ContainerBuilder().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerNull, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Default LIMITS should be set in container with null LIMITS",
-        containerNull.getResources().getLimits().entrySet()
-            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
-
-    // Empty resources.
-    V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerEmpty, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS",
-        containerNull.getResources().getLimits().entrySet()
-            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
-
-    // Custom resources.
-    V1Container containerCustom = new V1ContainerBuilder()
-        .withResources(customRequirements)
-        .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerCustom, configNoLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS",
-        containerCustom.getResources().getLimits().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-
-    // Custom resources with request.
-    V1Container containerRequests = new V1ContainerBuilder()
-        .withResources(customRequirements)
-        .build();
-    v1ControllerWithPodTemplate.configureContainerResources(
-        containerRequests, configWithLimit, resourceDefault, isExecutor);
-    Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST",
-        containerRequests.getResources().getLimits().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-    Assert.assertTrue("Custom REQUEST should be set in container with custom LIMITS and REQUEST",
-        containerRequests.getResources().getRequests().entrySet()
-            .containsAll(expectCustomRequirements.getLimits().entrySet()));
-  }
-
-  @Test
-  public void testConfigureContainerResourcesCLI() {
-    final boolean isExecutor = true;
-    final String customLimitMEMStr = "120Gi";
-    final String customLimitCPUStr = "5";
-    final String customRequestMEMStr = "100Mi";
-    final String customRequestCPUStr = "4";
-
-    final Resource resources = new Resource(
-        6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400));
-
-    final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr);
-    final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr);
-    final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr);
-    final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr);
-
-    final Config config = Config.newBuilder()
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
-                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customLimitCPUStr)
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
-                + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr)
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
-                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customRequestCPUStr)
-        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
-                + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME),
-            customRequestMEMStr)
-        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
-        .build();
-
-    final V1Container expected = new V1ContainerBuilder()
-        .withNewResources()
-          .addToLimits(KubernetesConstants.CPU, customLimitCPU)
-          .addToLimits(KubernetesConstants.MEMORY, customLimitMEM)
-          .addToRequests(KubernetesConstants.CPU, customRequestCPU)
-          .addToRequests(KubernetesConstants.MEMORY, customRequestMEM)
-        .endResources()
-        .build();
-
-    final V1Container actual = new V1Container();
-    v1ControllerWithPodTemplate.configureContainerResources(actual, config, resources, isExecutor);
-    Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
-  }
-
-  @Test
-  public void testMountVolumeIfPresent() {
-    final String pathDefault = "config-host-volume-path";
-    final String pathNameDefault = "config-host-volume-name";
-    final Config configWithVolumes = Config.newBuilder()
-        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault)
-        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault)
-        .build();
-    final KubernetesShim controllerWithMounts = new KubernetesShim(configWithVolumes, RUNTIME);
-    final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
-        .withName(pathNameDefault)
-        .withMountPath(pathDefault)
-        .build();
-    final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
-        .withName("custom-volume-mount")
-        .withMountPath("should-be-kept")
-        .build();
-
-    final List<V1VolumeMount> expectedMountsDefault = Collections.singletonList(volumeDefault);
-    final List<V1VolumeMount> expectedMountsCustom = Arrays.asList(volumeCustom, volumeDefault);
-    final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
-        new V1VolumeMountBuilder()
-            .withName(pathNameDefault)
-            .withMountPath("should-be-replaced")
-            .build(),
-        volumeCustom
-    );
-
-    // No Volume Mounts set.
-    KubernetesShim controllerDoNotSetMounts =
-        new KubernetesShim(Config.newBuilder().build(), RUNTIME);
-    V1Container containerNoSetMounts = new V1Container();
-    controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
-    Assert.assertNull(containerNoSetMounts.getVolumeMounts());
-
-    // Default. Null Volume Mounts.
-    V1Container containerNull = new V1ContainerBuilder().build();
-    controllerWithMounts.mountVolumeIfPresent(containerNull);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts()));
-
-    // Empty Volume Mounts.
-    V1Container containerEmpty = new V1ContainerBuilder()
-        .withVolumeMounts(new LinkedList<>())
-        .build();
-    controllerWithMounts.mountVolumeIfPresent(containerEmpty);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts()));
-
-    // Custom Volume Mounts.
-    V1Container containerCustom = new V1ContainerBuilder()
-        .withVolumeMounts(volumeMountsCustomList)
-        .build();
-    controllerWithMounts.mountVolumeIfPresent(containerCustom);
-    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS",
-        CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts()));
-  }
-
-  @Test
-  public void testConfigureTolerations() {
-    final V1Toleration keptToleration = new V1Toleration()
-        .key("kept toleration")
-        .operator("Some Operator")
-        .effect("Some Effect")
-        .tolerationSeconds(5L);
-    final List<V1Toleration> expectedTolerationBase =
-        Collections.unmodifiableList(KubernetesShim.getTolerations());
-    final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
-        Arrays.asList(
-            new V1Toleration()
-                .key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
-            new V1Toleration()
-                .key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
-            keptToleration
-        )
-    );
-
-    // Null Tolerations. This is the default case.
-    final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
-    v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
-    Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults",
-        CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
-            expectedTolerationBase));
-
-    // Empty Tolerations.
-    final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
-        .withTolerations(new LinkedList<>())
-        .build();
-    v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
-    Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults",
-        CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
-            expectedTolerationBase));
-
-    // Toleration overriding.
-    final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
-        .withTolerations(inputTolerationsBase)
-        .build();
-    final List<V1Toleration> expectedTolerationsOverriding =
-        new LinkedList<>(expectedTolerationBase);
-    expectedTolerationsOverriding.add(keptToleration);
-
-    v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
-    Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults",
-        CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
-            expectedTolerationsOverriding));
-  }
-
-  @Test
-  public void testCreatePersistentVolumeClaims() {
-    final String topologyName = "topology-name";
-    final String volumeNameOne = "volume-name-one";
-    final String volumeNameTwo = "volume-name-two";
-    final String volumeNameStatic = "volume-name-static";
-    final String claimNameOne = "OnDemand";
-    final String claimNameTwo = "claim-name-two";
-    final String claimNameStatic = "OnDEmaND";
-    final String storageClassName = "storage-class-name";
-    final String sizeLimit = "555Gi";
-    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
-    final String accessModes = "ReadOnlyMany";
-    final String volumeMode = "VolumeMode";
-    final String path = "/path/to/mount/";
-    final String subPath = "/sub/path/to/mount/";
-    final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
-        ImmutableMap.of(
-            volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameOne);
-                put(VolumeConfigKeys.storageClassName, storageClassName);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModesList);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-              }
-            },
-            volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameTwo);
-                put(VolumeConfigKeys.storageClassName, storageClassName);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModes);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-                put(VolumeConfigKeys.subPath, subPath);
-              }
-            },
-            volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
-              {
-                put(VolumeConfigKeys.claimName, claimNameStatic);
-                put(VolumeConfigKeys.sizeLimit, sizeLimit);
-                put(VolumeConfigKeys.accessModes, accessModes);
-                put(VolumeConfigKeys.volumeMode, volumeMode);
-                put(VolumeConfigKeys.path, path);
-                put(VolumeConfigKeys.subPath, subPath);
-              }
-            }
-        );
-
-    final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
-        .withNewMetadata()
-          .withName(volumeNameOne)
-          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
-        .endMetadata()
-        .withNewSpec()
-          .withStorageClassName(storageClassName)
-          .withAccessModes(Arrays.asList(accessModesList.split(",")))
-          .withVolumeMode(volumeMode)
-          .withNewResources()
-            .addToRequests("storage", new Quantity(sizeLimit))
-          .endResources()
-        .endSpec()
-        .build();
-
-    final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
-        .withNewMetadata()
-          .withName(volumeNameStatic)
-          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
-        .endMetadata()
-        .withNewSpec()
-          .withStorageClassName("")
-          .withAccessModes(Collections.singletonList(accessModes))
-          .withVolumeMode(volumeMode)
-          .withNewResources()
-            .addToRequests("storage", new Quantity(sizeLimit))
-          .endResources()
-        .endSpec()
-        .build();
-
-    final List<V1PersistentVolumeClaim> expectedClaims =
-        new LinkedList<>(Arrays.asList(claimOne, claimStatic));
-
-    final List<V1PersistentVolumeClaim> actualClaims =
-        v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
-
-    Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
-    Assert.assertTrue(expectedClaims.containsAll(actualClaims));
-  }
-
-  @Test
-  public void testCreatePersistentVolumeClaimVolumesAndMounts() {
-    final String volumeNameOne = "VolumeNameONE";
-    final String volumeNameTwo = "VolumeNameTWO";
-    final String claimNameOne = "claim-name-one";
-    final String claimNameTwo = "OnDemand";
-    final String mountPathOne = "/mount/path/ONE";
-    final String mountPathTwo = "/mount/path/TWO";
-    final String mountSubPathTwo = "/mount/sub/path/TWO";
-    Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
-        ImmutableMap.of(
-            volumeNameOne, ImmutableMap.of(
-                VolumeConfigKeys.claimName, claimNameOne,
-                VolumeConfigKeys.path, mountPathOne),
-            volumeNameTwo, ImmutableMap.of(
-                VolumeConfigKeys.claimName, claimNameTwo,
-                VolumeConfigKeys.path, mountPathTwo,
-                VolumeConfigKeys.subPath, mountSubPathTwo)
-        );
-    final V1Volume volumeOne = new V1VolumeBuilder()
-        .withName(volumeNameOne)
-        .withNewPersistentVolumeClaim()
-          .withClaimName(claimNameOne)
-        .endPersistentVolumeClaim()
-        .build();
-    final V1Volume volumeTwo = new V1VolumeBuilder()
-        .withName(volumeNameTwo)
-        .withNewPersistentVolumeClaim()
-          .withClaimName(claimNameTwo)
-        .endPersistentVolumeClaim()
-        .build();
-    final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
-        .withName(volumeNameOne)
-        .withMountPath(mountPathOne)
-        .build();
-    final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
-        .withName(volumeNameTwo)
-        .withMountPath(mountPathTwo)
-        .withSubPath(mountSubPathTwo)
-        .build();
-
-    // Test case container.
-    // Input: Map of Volume configurations.
-    // Output: The expected lists of Volumes and Volume Mounts.
-    final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
-        Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
-
-    // Default case: No PVC provided.
-    testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(),
-        new Pair<>(new LinkedList<>(), new LinkedList<>())));
-
-    // PVC Provided.
-    final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
-        new Pair<>(
-            new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
-            new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
-    testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
-        new Pair<>(expectedFull.first, expectedFull.second)));
-
-    // Testing loop.
-    for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
-             Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
-      List<V1Volume> actualVolume = new LinkedList<>();
-      List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
-      v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
-          actualVolume, actualVolumeMount);
-
-      Assert.assertTrue(testCase.description,
-          (testCase.expected.first).containsAll(actualVolume));
-      Assert.assertTrue(testCase.description + " Mounts",
-          (testCase.expected.second).containsAll(actualVolumeMount));
-    }
-  }
-
-  @Test
-  public void testConfigurePodWithVolumesAndMountsFromCLI() {
-    final String volumeNameClashing = "clashing-volume";
-    final String volumeMountNameClashing = "original-volume-mount";
-    V1Volume baseVolume = new V1VolumeBuilder()
-        .withName(volumeNameClashing)
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Original Base Claim Name")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
-        .withName(volumeMountNameClashing)
-        .withMountPath("/original/mount/path")
-        .build();
-    V1Volume clashingVolume = new V1VolumeBuilder()
-        .withName(volumeNameClashing)
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Clashing Claim Replaced")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
-        .withName(volumeMountNameClashing)
-        .withMountPath("/clashing/mount/path")
-        .build();
-    V1Volume secondaryVolume = new V1VolumeBuilder()
-        .withName("secondary-volume")
-        .withNewPersistentVolumeClaim()
-        .withClaimName("Original Secondary Claim Name")
-        .endPersistentVolumeClaim()
-        .build();
-    V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
-        .withName("secondary-volume-mount")
-        .withMountPath("/secondary/mount/path")
-        .build();
-
-    // Test case container.
-    // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List of Volumes
-    // [3] List of Volume Mounts.
-    // Output: The expected <V1PodSpec> and <V1Container>.
-    final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = new LinkedList<>();
-
-    // No Persistent Volume Claim.
-    final V1PodSpec podSpecEmptyCase = new V1PodSpecBuilder().withVolumes(baseVolume).build();
-    final V1Container executorEmptyCase =
-        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
-    final V1PodSpec expectedEmptyPodSpec = new V1PodSpecBuilder().withVolumes(baseVolume).build();
-    final V1Container expectedEmptyExecutor =
-        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
-
-    testCases.add(new TestTuple<>("Empty",
-        new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), new LinkedList<>()},
-        new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
-
-    // Non-clashing Persistent Volume Claim.
-    final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
-        .withVolumes(baseVolume)
-        .build();
-    final V1Container executorNoClashCase = new V1ContainerBuilder()
-        .withVolumeMounts(baseVolumeMount)
-        .build();
-    final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
-        .addToVolumes(baseVolume)
-        .addToVolumes(secondaryVolume)
-        .build();
-    final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
-        .addToVolumeMounts(baseVolumeMount)
-        .addToVolumeMounts(secondaryVolumeMount)
-        .build();
-
-    testCases.add(new TestTuple<>("No Clash",
-        new Object[]{podSpecNoClashCase, executorNoClashCase,
-            Collections.singletonList(secondaryVolume),
-            Collections.singletonList(secondaryVolumeMount)},
-        new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
-
-    // Clashing Persistent Volume Claim.
-    final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
-        .withVolumes(baseVolume)
-        .build();
-    final V1Container executorClashCase = new V1ContainerBuilder()
-        .withVolumeMounts(baseVolumeMount)
-        .build();
-    final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
-        .addToVolumes(clashingVolume)
-        .addToVolumes(secondaryVolume)
-        .build();
-    final V1Container expectedClashExecutor = new V1ContainerBuilder()
-        .addToVolumeMounts(clashingVolumeMount)
-        .addToVolumeMounts(secondaryVolumeMount)
-        .build();
-
-    testCases.add(new TestTuple<>("Clashing",
-        new Object[]{podSpecClashCase, executorClashCase,
-            Arrays.asList(clashingVolume, secondaryVolume),
-            Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
-        new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
-
-    // Testing loop.
-    for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) {
-      v1ControllerWithPodTemplate
-          .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0],
-              (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2],
-              (List<V1VolumeMount>) testCase.input[3]);
-
-      Assert.assertEquals("Pod Specs match " + testCase.description,
-          testCase.input[0], testCase.expected.first);
-      Assert.assertEquals("Executors match " + testCase.description,
-          testCase.input[1], testCase.expected.second);
-    }
-  }
-
-  @Test
-  public void testSetShardIdEnvironmentVariableCommand() {
-
-    List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
-
-    testCases.add(new TestTuple<>("Executor command is set correctly",
-        true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo shardId=${SHARD_ID}"));
-    testCases.add(new TestTuple<>("Manager command is set correctly",
-        false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}"));
-
-    for (TestTuple<Boolean, String> testCase : testCases) {
-      Assert.assertEquals(testCase.description, testCase.expected,
-          v1ControllerWithPodTemplate.setShardIdEnvironmentVariableCommand(testCase.input));
-    }
-  }
-
-  @Test
-  public void testCreateResourcesRequirement() {
-    final String managerCpuLimit = "3000m";
-    final String managerMemLimit = "256Gi";
-    final Quantity memory = Quantity.fromString(managerMemLimit);
-    final Quantity cpu = Quantity.fromString(managerCpuLimit);
-    final List<TestTuple<Map<String, String>, Map<String, Quantity>>> testCases =
-        new LinkedList<>();
-
-    // No input.
-    Map<String, String> inputEmpty = new HashMap<>();
-    testCases.add(new TestTuple<>("Empty input.", inputEmpty, new HashMap<>()));
-
-    // Only memory.
-    Map<String, String> inputMemory = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.MEMORY, managerMemLimit);
-      }
-    };
-    Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.MEMORY, memory);
-      }
-    };
-    testCases.add(new TestTuple<>("Only memory input.", inputMemory, expectedMemory));
-
-    // Only CPU.
-    Map<String, String> inputCPU = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU));
-
-    // CPU and memory.
-    Map<String, String> inputMemoryCPU = new HashMap<String, String>() {
-      {
-        put(KubernetesConstants.MEMORY, managerMemLimit);
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.MEMORY, memory);
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, expectedMemoryCPU));
-
-    // Invalid.
-    Map<String, String> inputInvalid = new HashMap<String, String>() {
-      {
-        put("invalid input", "will not be ignored");
-        put(KubernetesConstants.CPU, managerCpuLimit);
-      }
-    };
-    Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() {
-      {
-        put(KubernetesConstants.CPU, cpu);
-      }
-    };
-    testCases.add(new TestTuple<>("Invalid input.", inputInvalid, expectedInvalid));
-
-    // Test loop.
-    for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) {
-      Map<String, Quantity> actual =
-          v1ControllerPodTemplate.createResourcesRequirement(testCase.input);
-      Assert.assertEquals(testCase.description, testCase.expected, actual);
-    }
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsEmptyDirCLI() {
-    final String volumeName = "volume-name-empty-dir";
-    final String medium = "Memory";
-    final String sizeLimit = "1Gi";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // Empty Dir.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.sizeLimit, sizeLimit);
-            put(VolumeConfigKeys.medium, "Memory");
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewEmptyDir()
-              .withMedium(medium)
-              .withNewSizeLimit(sizeLimit)
-            .endEmptyDir()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-              .withMountPath(path)
-              .withSubPath(subPath)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
-    Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes);
-    Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts);
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsHostPathCLI() {
-    final String volumeName = "volume-name-host-path";
-    final String type = "DirectoryOrCreate";
-    final String pathOnHost = "path.on.host";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // Host Path.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.type, type);
-            put(VolumeConfigKeys.pathOnHost, pathOnHost);
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewHostPath()
-              .withNewType(type)
-              .withNewPath(pathOnHost)
-            .endHostPath()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-              .withMountPath(path)
-              .withSubPath(subPath)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
-    Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes);
-    Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts);
-  }
-
-  @Test
-  public void testCreateVolumeAndMountsNFSCLI() {
-    final String volumeName = "volume-name-nfs";
-    final String server = "nfs.server.address";
-    final String pathOnNFS = "path.on.host";
-    final String readOnly = "true";
-    final String path = "/path/to/mount";
-    final String subPath = "/sub/path/to/mount";
-
-    // NFS.
-    final Map<String, Map<VolumeConfigKeys, String>> config =
-        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
-          {
-            put(VolumeConfigKeys.server, server);
-            put(VolumeConfigKeys.readOnly, readOnly);
-            put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
-            put(VolumeConfigKeys.path, path);
-            put(VolumeConfigKeys.subPath, subPath);
-          }
-        });
-    final List<V1Volume> expectedVolumes = Collections.singletonList(
-        new V1VolumeBuilder()
-            .withName(volumeName)
-            .withNewNfs()
-              .withServer(server)
-              .withPath(pathOnNFS)
-              .withReadOnly(Boolean.parseBoolean(readOnly))
-            .endNfs()
-            .build()
-    );
-    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
-        new V1VolumeMountBuilder()
-            .withName(volumeName)
-            .withMountPath(path)
-            .withSubPath(subPath)
-            .withReadOnly(true)
-            .build()
-    );
-
-    List<V1Volume> actualVolumes = new LinkedList<>();
-    List<V1VolumeMount> actualMounts = new LinkedList<>();
-    v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
-    Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes);
-    Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts);
-  }
 }


[incubator-heron] 04/05: [Tests] Kubernetes Utils referencing methods from Stateful Set factory.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit e73994e51cea30fb71b61f9d18cc8c84cad08d85
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 23:24:41 2022 -0400

    [Tests] Kubernetes Utils referencing methods from Stateful Set factory.
---
 .../org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java    | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
index 48ff6aab01c..38a212497fb 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -43,14 +43,14 @@ public class KubernetesUtilsTest {
   public void testMergeListsDedupe() {
     final String description = "Pod Template Environment Variables";
     final List<V1EnvVar> heronEnvVars =
-        Collections.unmodifiableList(KubernetesShim.getExecutorEnvVars());
+        Collections.unmodifiableList(StatefulSet.getExecutorEnvVars());
     final V1EnvVar additionEnvVar = new V1EnvVar()
         .name("env-variable-to-be-kept")
         .valueFrom(new V1EnvVarSource()
             .fieldRef(new V1ObjectFieldSelector()
                 .fieldPath("env-variable-was-kept")));
     final List<V1EnvVar> expectedEnvVars = Collections.unmodifiableList(
-        new LinkedList<V1EnvVar>(KubernetesShim.getExecutorEnvVars()) {{
+        new LinkedList<V1EnvVar>(StatefulSet.getExecutorEnvVars()) {{
           add(additionEnvVar);
         }});
     final List<V1EnvVar> inputEnvVars = Arrays.asList(


[incubator-heron] 01/05: [StatefulSet] added javadoc for Configs constructor.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 59dbacdb408b7d4620d482cde09716c5a504d2a1
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 23:02:07 2022 -0400

    [StatefulSet] added javadoc for Configs constructor.
---
 .../java/org/apache/heron/scheduler/kubernetes/StatefulSet.java   | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 154f816867d..256885abb2b 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -88,6 +88,14 @@ final class StatefulSet {
     private final V1PodTemplateSpec managerPodTemplateSpec;
     private final V1PodTemplateSpec executorPodTemplateSpec;
 
+    /**
+     * <code>Configs</code> contains the Kubernetes cluster configurations as well as the
+     * <code>Pod Templates</code> for the <code>Executor</code>s and <code>Manager</code>.
+     * @param configuration The cluster configurations contains items relating/stored in the Kubernetes cluster.
+     * @param runtimeConfiguration The runtime configurations contain items such as the topology name.
+     * @param managerPodTemplateSpec The <code>Pod Template Spec</code> configurations for a <code>Manager</code>.
+     * @param executorPodTemplateSpec The <code>Pod Template Spec</code> configurations for the <code>Executor</code>s.
+     */
     Configs(Config configuration, Config runtimeConfiguration,
             V1PodTemplateSpec managerPodTemplateSpec, V1PodTemplateSpec executorPodTemplateSpec) {
       this.topologyName = Runtime.topologyName(runtimeConfiguration);


[incubator-heron] 02/05: [KubernetesShim] wired in Stateful Set factory

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 6722ee7bf8d41419079988ca610feaa9e5677aa0
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 23:02:37 2022 -0400

    [KubernetesShim] wired in Stateful Set factory
---
 .../apache/heron/scheduler/kubernetes/KubernetesShim.java  | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
index ca339b8320a..c122ff5cda3 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -150,8 +150,18 @@ public class KubernetesShim extends KubernetesController {
     for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
       numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
     }
-    final V1StatefulSet executors = createStatefulSet(containerResource, numberOfInstances, true);
-    final V1StatefulSet manager = createStatefulSet(containerResource, numberOfInstances, false);
+
+    final StatefulSet.Configs clusterConfigs = new StatefulSet.Configs(
+        getConfiguration(),
+        getRuntimeConfiguration(),
+        loadPodFromTemplate(false),
+        loadPodFromTemplate(true)
+        );
+
+    final V1StatefulSet executors = StatefulSet.get()
+        .create(StatefulSet.Type.Executor, clusterConfigs, containerResource, numberOfInstances);
+    final V1StatefulSet manager = StatefulSet.get()
+        .create(StatefulSet.Type.Manager, clusterConfigs, containerResource, numberOfInstances);
 
     try {
       appsClient.createNamespacedStatefulSet(getNamespace(), executors, null,