You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/20 23:58:11 UTC

[incubator-heron] 03/03: [Tests] Shim to Stateful Set test migration.

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 23ded81598237d687be0b9301b307af96ef0c825
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 19:57:56 2022 -0400

    [Tests] Shim to Stateful Set test migration.
---
 .../scheduler/kubernetes/StatefulSetTest.java      | 966 ++++++++++++++++++++-
 1 file changed, 964 insertions(+), 2 deletions(-)

diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
index 746823f21ea..10f48845b5f 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
@@ -19,12 +19,974 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerBuilder;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1ContainerPortBuilder;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpecBuilder;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
 
+import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
+import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+
+@RunWith(MockitoJUnitRunner.class)
 public class StatefulSetTest {
+  private static final String TOPOLOGY_NAME = "topology-name";
+  private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+  private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+  private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+      String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+  private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
+      String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+          KubernetesConstants.EXECUTOR_NAME);
+  private static final String POD_TEMPLATE_LOCATION_MANAGER =
+      String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+          KubernetesConstants.MANAGER_NAME);
+  private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
+      .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+      .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+      .build();
+  private static final Config RUNTIME = Config.newBuilder()
+      .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+      .build();
+  private static final StatefulSet STATEFUL_SET = new StatefulSet();
+  private static final V1PodTemplateSpec POD_TEMPLATE_SPEC = new V1PodTemplateSpecBuilder()
+      .withNewMetadata()
+        .withLabels(Collections.singletonMap("app", "heron-tracker"))
+      .endMetadata()
+      .withNewSpec()
+      .withContainers(new V1ContainerBuilder()
+          .withName("heron-tracker")
+          .withImage("apache/heron:latest")
+          .withPorts(new V1ContainerPortBuilder()
+              .withName("api-port")
+              .withContainerPort(8888)
+            .build())
+          .withNewResources()
+          .withRequests(Collections.singletonMap("100m", new Quantity("200M")))
+          .withLimits(Collections.singletonMap("400m", new Quantity("512M")))
+          .endResources()
+          .build())
+      .endSpec()
+      .build();
+  private static final StatefulSet.Configs CLUSTER_CONFIGS =
+      new StatefulSet.Configs(CONFIG_WITH_POD_TEMPLATE, RUNTIME,
+        POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+
+
+  @Test
+  public void testConfigureContainerPorts() {
+    final String portNamekept = "random-port-to-be-kept";
+    final int portNumberkept = 1111;
+    final int numInstances = 3;
+    final List<V1ContainerPort> expectedPortsBase =
+        Collections.unmodifiableList(StatefulSet.getExecutorPorts());
+    final List<V1ContainerPort> debugPorts =
+        Collections.unmodifiableList(StatefulSet.getDebuggingPorts(numInstances));
+    final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
+        Arrays.asList(
+            new V1ContainerPort()
+                .name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
+            new V1ContainerPort()
+                .name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
+            new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
+        )
+    );
+
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    // Null ports. This is the default case.
+    final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build();
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithNullPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container with null ports list",
+        CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase));
+
+    // Empty ports.
+    final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
+        .withPorts(new LinkedList<>())
+        .build();
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container with empty ports list",
+        CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase));
+
+    // Port overriding.
+    final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
+    final V1Container inputContainerWithPorts = new V1ContainerBuilder()
+        .withPorts(inputPorts)
+        .build();
+    final List<V1ContainerPort> expectedPortsOverriding = new LinkedList<>(expectedPortsBase);
+    expectedPortsOverriding
+        .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+
+    STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithPorts);
+    Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.",
+        CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding));
+
+    // Port overriding with debug ports.
+    final List<V1ContainerPort> inputPortsWithDebug = new LinkedList<>(debugPorts);
+    inputPortsWithDebug.addAll(inputPortsBase);
+    final V1Container inputContainerWithDebug = new V1ContainerBuilder()
+        .withPorts(inputPortsWithDebug)
+        .build();
+    final List<V1ContainerPort> expectedPortsDebug = new LinkedList<>(expectedPortsBase);
+    expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+    expectedPortsDebug.addAll(debugPorts);
+
+    STATEFUL_SET.configureContainerPorts(true, numInstances, inputContainerWithDebug);
+    Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.",
+        CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug));
+  }
+
+  @Test
+  public void testConfigureContainerEnvVars() {
+    final List<V1EnvVar> heronEnvVars =
+        Collections.unmodifiableList(STATEFUL_SET.getExecutorEnvVars());
+    final V1EnvVar additionEnvVar = new V1EnvVar()
+        .name("env-variable-to-be-kept")
+        .valueFrom(new V1EnvVarSource()
+            .fieldRef(new V1ObjectFieldSelector()
+                .fieldPath("env-variable-was-kept")));
+    final List<V1EnvVar> inputEnvVars = Arrays.asList(
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_HOST)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("env-host-to-be-replaced"))),
+        new V1EnvVar()
+            .name(KubernetesConstants.ENV_POD_NAME)
+            .valueFrom(new V1EnvVarSource()
+                .fieldRef(new V1ObjectFieldSelector()
+                    .fieldPath("pod-name-to-be-replaced"))),
+        additionEnvVar
+    );
+
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    // Null env vars. This is the default case.
+    V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
+    STATEFUL_SET.configureContainerEnvVars(containerWithNullEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match",
+        CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars));
+
+    // Empty env vars.
+    V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
+        .withEnv(new LinkedList<>())
+        .build();
+    STATEFUL_SET.configureContainerEnvVars(containerWithEmptyEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match",
+        CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars));
+
+    // Env Var overriding.
+    final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
+    expectedOverriding.add(additionEnvVar);
+    V1Container containerWithEnvVars = new V1ContainerBuilder()
+        .withEnv(inputEnvVars)
+        .build();
+    STATEFUL_SET.configureContainerEnvVars(containerWithEnvVars);
+    Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden",
+        CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding));
+  }
+
+  @Test
+  public void testConfigureContainerResources() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final boolean isExecutor = true;
+
+    final Resource resourceDefault = new Resource(
+        9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000));
+    final Resource resourceCustom = new Resource(
+        4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000));
+
+    final Quantity defaultRAM = Quantity.fromString(
+        KubernetesUtils.Megabytes(resourceDefault.getRam()));
+    final Quantity defaultCPU = Quantity.fromString(
+        Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3)));
+    final Quantity customRAM = Quantity.fromString(
+        KubernetesUtils.Megabytes(resourceCustom.getRam()));
+    final Quantity customCPU = Quantity.fromString(
+        Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3)));
+    final Quantity customDisk = Quantity.fromString(
+        KubernetesUtils.Megabytes(resourceCustom.getDisk()));
+
+    final Config configNoLimit = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
+        .build();
+    final Config configWithLimit = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
+        .build();
+
+    final V1ResourceRequirements expectDefaultRequirements = new V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+        .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
+
+    final V1ResourceRequirements expectCustomRequirements = new V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+        .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
+        .putLimitsItem("disk", customDisk);
+
+    final V1ResourceRequirements customRequirements = new V1ResourceRequirements()
+        .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
+        .putLimitsItem(KubernetesConstants.CPU, customCPU)
+        .putLimitsItem("disk", customDisk);
+
+    // Default. Null resources.
+    V1Container containerNull = new V1ContainerBuilder().build();
+    STATEFUL_SET.configureContainerResources(containerNull, configNoLimit, resourceDefault,
+        isExecutor);
+    Assert.assertTrue("Default LIMITS should be set in container with null LIMITS",
+        containerNull.getResources().getLimits().entrySet()
+            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+    // Empty resources.
+    V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build();
+    STATEFUL_SET.configureContainerResources(containerEmpty, configNoLimit, resourceDefault,
+        isExecutor);
+    Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS",
+        containerNull.getResources().getLimits().entrySet()
+            .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+    // Custom resources.
+    V1Container containerCustom = new V1ContainerBuilder()
+        .withResources(customRequirements)
+        .build();
+    STATEFUL_SET.configureContainerResources(containerCustom, configNoLimit, resourceDefault,
+        isExecutor);
+    Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS",
+        containerCustom.getResources().getLimits().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+
+    // Custom resources with request.
+    V1Container containerRequests = new V1ContainerBuilder()
+        .withResources(customRequirements)
+        .build();
+    STATEFUL_SET.configureContainerResources(containerRequests, configWithLimit, resourceDefault,
+        isExecutor);
+    Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST",
+        containerRequests.getResources().getLimits().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+    Assert.assertTrue("Custom REQUEST should be set in container with custom LIMITS and REQUEST",
+        containerRequests.getResources().getRequests().entrySet()
+            .containsAll(expectCustomRequirements.getLimits().entrySet()));
+  }
+
+  @Test
+  public void testConfigureContainerResourcesCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final boolean isExecutor = true;
+    final String customLimitMEMStr = "120Gi";
+    final String customLimitCPUStr = "5";
+    final String customRequestMEMStr = "100Mi";
+    final String customRequestCPUStr = "4";
+
+    final Resource resources = new Resource(
+        6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400));
+
+    final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr);
+    final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr);
+    final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr);
+    final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr);
+
+    final Config config = Config.newBuilder()
+        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
+                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customLimitCPUStr)
+        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
+                + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr)
+        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
+                + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customRequestCPUStr)
+        .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
+                + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME),
+            customRequestMEMStr)
+        .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
+        .build();
+
+    final V1Container expected = new V1ContainerBuilder()
+        .withNewResources()
+          .addToLimits(KubernetesConstants.CPU, customLimitCPU)
+          .addToLimits(KubernetesConstants.MEMORY, customLimitMEM)
+          .addToRequests(KubernetesConstants.CPU, customRequestCPU)
+          .addToRequests(KubernetesConstants.MEMORY, customRequestMEM)
+        .endResources()
+        .build();
+
+    final V1Container actual = new V1Container();
+    STATEFUL_SET.configureContainerResources(actual, config, resources, isExecutor);
+    Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
+  }
+
+  @Test
+  public void testMountVolumeIfPresent() {
+    final String pathDefault = "config-host-volume-path";
+    final String pathNameDefault = "config-host-volume-name";
+    final Config configWithVolumes = Config.newBuilder()
+        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault)
+        .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault)
+        .build();
+    final StatefulSet.Configs configsWithVolumes = new StatefulSet.Configs(configWithVolumes,
+        RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+    final StatefulSet.Configs configsWithNoVolumes = new StatefulSet.Configs(
+        Config.newBuilder().build(), RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+    final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
+        .withName(pathNameDefault)
+        .withMountPath(pathDefault)
+        .build();
+    final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
+        .withName("custom-volume-mount")
+        .withMountPath("should-be-kept")
+        .build();
+
+    final List<V1VolumeMount> expectedMountsDefault = Collections.singletonList(volumeDefault);
+    final List<V1VolumeMount> expectedMountsCustom = Arrays.asList(volumeCustom, volumeDefault);
+    final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
+        new V1VolumeMountBuilder()
+            .withName(pathNameDefault)
+            .withMountPath("should-be-replaced")
+            .build(),
+        volumeCustom
+    );
+
+    // No Volume Mounts set.
+    STATEFUL_SET.setClusterConfigs(configsWithNoVolumes);
+    V1Container containerNoSetMounts = new V1Container();
+    STATEFUL_SET.mountVolumeIfPresent(containerNoSetMounts);
+    Assert.assertNull(containerNoSetMounts.getVolumeMounts());
+
+    // Configure factory for volume mounts.
+    STATEFUL_SET.setClusterConfigs(configsWithVolumes);
+
+    // Default. Null Volume Mounts.
+    V1Container containerNull = new V1ContainerBuilder().build();
+    STATEFUL_SET.mountVolumeIfPresent(containerNull);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts()));
+
+    // Empty Volume Mounts.
+    V1Container containerEmpty = new V1ContainerBuilder()
+        .withVolumeMounts(new LinkedList<>())
+        .build();
+    STATEFUL_SET.mountVolumeIfPresent(containerEmpty);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts()));
+
+    // Custom Volume Mounts.
+    V1Container containerCustom = new V1ContainerBuilder()
+        .withVolumeMounts(volumeMountsCustomList)
+        .build();
+    STATEFUL_SET.mountVolumeIfPresent(containerCustom);
+    Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS",
+        CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts()));
+  }
+
+  @Test
+  public void testConfigureTolerations() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final V1Toleration keptToleration = new V1Toleration()
+        .key("kept toleration")
+        .operator("Some Operator")
+        .effect("Some Effect")
+        .tolerationSeconds(5L);
+    final List<V1Toleration> expectedTolerationBase =
+        Collections.unmodifiableList(KubernetesShim.getTolerations());
+    final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
+        Arrays.asList(
+            new V1Toleration()
+                .key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
+            new V1Toleration()
+                .key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
+            keptToleration
+        )
+    );
+
+    // Null Tolerations. This is the default case.
+    final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
+    STATEFUL_SET.configureTolerations(podSpecNullTolerations);
+    Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults",
+        CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
+            expectedTolerationBase));
+
+    // Empty Tolerations.
+    final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
+        .withTolerations(new LinkedList<>())
+        .build();
+    STATEFUL_SET.configureTolerations(podSpecWithEmptyTolerations);
+    Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults",
+        CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
+            expectedTolerationBase));
+
+    // Toleration overriding.
+    final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
+        .withTolerations(inputTolerationsBase)
+        .build();
+    final List<V1Toleration> expectedTolerationsOverriding =
+        new LinkedList<>(expectedTolerationBase);
+    expectedTolerationsOverriding.add(keptToleration);
+
+    STATEFUL_SET.configureTolerations(podSpecWithTolerations);
+    Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults",
+        CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
+            expectedTolerationsOverriding));
+  }
+
+  @Test
+  public void testCreatePersistentVolumeClaims() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String topologyName = "topology-name";
+    final String volumeNameOne = "volume-name-one";
+    final String volumeNameTwo = "volume-name-two";
+    final String volumeNameStatic = "volume-name-static";
+    final String claimNameOne = "OnDemand";
+    final String claimNameTwo = "claim-name-two";
+    final String claimNameStatic = "OnDEmaND";
+    final String storageClassName = "storage-class-name";
+    final String sizeLimit = "555Gi";
+    final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+    final String accessModes = "ReadOnlyMany";
+    final String volumeMode = "VolumeMode";
+    final String path = "/path/to/mount/";
+    final String subPath = "/sub/path/to/mount/";
+    final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
+        ImmutableMap.of(
+            volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
+              {
+                put(VolumeConfigKeys.claimName, claimNameOne);
+                put(VolumeConfigKeys.storageClassName, storageClassName);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModesList);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
+              }
+            },
+            volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
+              {
+                put(VolumeConfigKeys.claimName, claimNameTwo);
+                put(VolumeConfigKeys.storageClassName, storageClassName);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModes);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
+                put(VolumeConfigKeys.subPath, subPath);
+              }
+            },
+            volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
+              {
+                put(VolumeConfigKeys.claimName, claimNameStatic);
+                put(VolumeConfigKeys.sizeLimit, sizeLimit);
+                put(VolumeConfigKeys.accessModes, accessModes);
+                put(VolumeConfigKeys.volumeMode, volumeMode);
+                put(VolumeConfigKeys.path, path);
+                put(VolumeConfigKeys.subPath, subPath);
+              }
+            }
+        );
+
+    final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameOne)
+          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName(storageClassName)
+          .withAccessModes(Arrays.asList(accessModesList.split(",")))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
+        .withNewMetadata()
+          .withName(volumeNameStatic)
+          .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+        .endMetadata()
+        .withNewSpec()
+          .withStorageClassName("")
+          .withAccessModes(Collections.singletonList(accessModes))
+          .withVolumeMode(volumeMode)
+          .withNewResources()
+            .addToRequests("storage", new Quantity(sizeLimit))
+          .endResources()
+        .endSpec()
+        .build();
+
+    final List<V1PersistentVolumeClaim> expectedClaims =
+        new LinkedList<>(Arrays.asList(claimOne, claimStatic));
+
+    final List<V1PersistentVolumeClaim> actualClaims =
+        STATEFUL_SET.createPersistentVolumeClaims(mapPVCOpts);
+
+    Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
+    Assert.assertTrue(expectedClaims.containsAll(actualClaims));
+  }
+
+  @Test
+  public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String volumeNameOne = "VolumeNameONE";
+    final String volumeNameTwo = "VolumeNameTWO";
+    final String claimNameOne = "claim-name-one";
+    final String claimNameTwo = "OnDemand";
+    final String mountPathOne = "/mount/path/ONE";
+    final String mountPathTwo = "/mount/path/TWO";
+    final String mountSubPathTwo = "/mount/sub/path/TWO";
+    Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
+        ImmutableMap.of(
+            volumeNameOne, ImmutableMap.of(
+                VolumeConfigKeys.claimName, claimNameOne,
+                VolumeConfigKeys.path, mountPathOne),
+            volumeNameTwo, ImmutableMap.of(
+                VolumeConfigKeys.claimName, claimNameTwo,
+                VolumeConfigKeys.path, mountPathTwo,
+                VolumeConfigKeys.subPath, mountSubPathTwo)
+        );
+    final V1Volume volumeOne = new V1VolumeBuilder()
+        .withName(volumeNameOne)
+        .withNewPersistentVolumeClaim()
+          .withClaimName(claimNameOne)
+        .endPersistentVolumeClaim()
+        .build();
+    final V1Volume volumeTwo = new V1VolumeBuilder()
+        .withName(volumeNameTwo)
+        .withNewPersistentVolumeClaim()
+          .withClaimName(claimNameTwo)
+        .endPersistentVolumeClaim()
+        .build();
+    final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
+        .withName(volumeNameOne)
+        .withMountPath(mountPathOne)
+        .build();
+    final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
+        .withName(volumeNameTwo)
+        .withMountPath(mountPathTwo)
+        .withSubPath(mountSubPathTwo)
+        .build();
+
+    // Test case container.
+    // Input: Map of Volume configurations.
+    // Output: The expected lists of Volumes and Volume Mounts.
+    final List<KubernetesUtils.TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+          Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
+
+    // Default case: No PVC provided.
+    testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(),
+        new Pair<>(new LinkedList<>(), new LinkedList<>())));
+
+    // PVC Provided.
+    final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
+        new Pair<>(
+            new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
+            new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
+    testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
+        new Pair<>(expectedFull.first, expectedFull.second)));
+
+    // Testing loop.
+    for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+             Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+      List<V1Volume> actualVolume = new LinkedList<>();
+      List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
+      STATEFUL_SET.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+          actualVolume, actualVolumeMount);
+
+      Assert.assertTrue(testCase.description,
+          (testCase.expected.first).containsAll(actualVolume));
+      Assert.assertTrue(testCase.description + " Mounts",
+          (testCase.expected.second).containsAll(actualVolumeMount));
+    }
+  }
+
+  @Test
+  public void testConfigurePodWithVolumesAndMountsFromCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String volumeNameClashing = "clashing-volume";
+    final String volumeMountNameClashing = "original-volume-mount";
+    V1Volume baseVolume = new V1VolumeBuilder()
+        .withName(volumeNameClashing)
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Original Base Claim Name")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
+        .withName(volumeMountNameClashing)
+        .withMountPath("/original/mount/path")
+        .build();
+    V1Volume clashingVolume = new V1VolumeBuilder()
+        .withName(volumeNameClashing)
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Clashing Claim Replaced")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
+        .withName(volumeMountNameClashing)
+        .withMountPath("/clashing/mount/path")
+        .build();
+    V1Volume secondaryVolume = new V1VolumeBuilder()
+        .withName("secondary-volume")
+        .withNewPersistentVolumeClaim()
+        .withClaimName("Original Secondary Claim Name")
+        .endPersistentVolumeClaim()
+        .build();
+    V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
+        .withName("secondary-volume-mount")
+        .withMountPath("/secondary/mount/path")
+        .build();
+
+    // Test case container.
+    // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List of Volumes
+    // [3] List of Volume Mounts.
+    // Output: The expected <V1PodSpec> and <V1Container>.
+    final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = new LinkedList<>();
+
+    // No Persistent Volume Claim.
+    final V1PodSpec podSpecEmptyCase = new V1PodSpecBuilder().withVolumes(baseVolume).build();
+    final V1Container executorEmptyCase =
+        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+    final V1PodSpec expectedEmptyPodSpec = new V1PodSpecBuilder().withVolumes(baseVolume).build();
+    final V1Container expectedEmptyExecutor =
+        new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+
+    testCases.add(new TestTuple<>("Empty",
+        new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), new LinkedList<>()},
+        new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
+
+    // Non-clashing Persistent Volume Claim.
+    final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
+        .withVolumes(baseVolume)
+        .build();
+    final V1Container executorNoClashCase = new V1ContainerBuilder()
+        .withVolumeMounts(baseVolumeMount)
+        .build();
+    final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
+        .addToVolumes(baseVolume)
+        .addToVolumes(secondaryVolume)
+        .build();
+    final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
+        .addToVolumeMounts(baseVolumeMount)
+        .addToVolumeMounts(secondaryVolumeMount)
+        .build();
+
+    testCases.add(new TestTuple<>("No Clash",
+        new Object[]{podSpecNoClashCase, executorNoClashCase,
+            Collections.singletonList(secondaryVolume),
+            Collections.singletonList(secondaryVolumeMount)},
+        new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
+
+    // Clashing Persistent Volume Claim.
+    final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
+        .withVolumes(baseVolume)
+        .build();
+    final V1Container executorClashCase = new V1ContainerBuilder()
+        .withVolumeMounts(baseVolumeMount)
+        .build();
+    final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
+        .addToVolumes(clashingVolume)
+        .addToVolumes(secondaryVolume)
+        .build();
+    final V1Container expectedClashExecutor = new V1ContainerBuilder()
+        .addToVolumeMounts(clashingVolumeMount)
+        .addToVolumeMounts(secondaryVolumeMount)
+        .build();
+
+    testCases.add(new TestTuple<>("Clashing",
+        new Object[]{podSpecClashCase, executorClashCase,
+            Arrays.asList(clashingVolume, secondaryVolume),
+            Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
+        new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
+
+    // Testing loop.
+    for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) {
+      STATEFUL_SET
+          .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0],
+              (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2],
+              (List<V1VolumeMount>) testCase.input[3]);
+
+      Assert.assertEquals("Pod Specs match " + testCase.description,
+          testCase.input[0], testCase.expected.first);
+      Assert.assertEquals("Executors match " + testCase.description,
+          testCase.input[1], testCase.expected.second);
+    }
+  }
+
   @Test
-  public void testEmpty() {
-    Assert.assertTrue(true);
+  public void testSetShardIdEnvironmentVariableCommand() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+
+    testCases.add(new TestTuple<>("Executor command is set correctly",
+        true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo shardId=${SHARD_ID}"));
+    testCases.add(new TestTuple<>("Manager command is set correctly",
+        false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}"));
+
+    for (TestTuple<Boolean, String> testCase : testCases) {
+      Assert.assertEquals(testCase.description, testCase.expected,
+          STATEFUL_SET.setShardIdEnvironmentVariableCommand(testCase.input));
+    }
+  }
+
+  @Test
+  public void testCreateResourcesRequirement() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String managerCpuLimit = "3000m";
+    final String managerMemLimit = "256Gi";
+    final Quantity memory = Quantity.fromString(managerMemLimit);
+    final Quantity cpu = Quantity.fromString(managerCpuLimit);
+    final List<TestTuple<Map<String, String>, Map<String, Quantity>>> testCases =
+        new LinkedList<>();
+
+    // No input.
+    Map<String, String> inputEmpty = new HashMap<>();
+    testCases.add(new TestTuple<>("Empty input.", inputEmpty, new HashMap<>()));
+
+    // Only memory.
+    Map<String, String> inputMemory = new HashMap<String, String>() {
+      {
+        put(KubernetesConstants.MEMORY, managerMemLimit);
+      }
+    };
+    Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() {
+      {
+        put(KubernetesConstants.MEMORY, memory);
+      }
+    };
+    testCases.add(new TestTuple<>("Only memory input.", inputMemory, expectedMemory));
+
+    // Only CPU.
+    Map<String, String> inputCPU = new HashMap<String, String>() {
+      {
+        put(KubernetesConstants.CPU, managerCpuLimit);
+      }
+    };
+    Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() {
+      {
+        put(KubernetesConstants.CPU, cpu);
+      }
+    };
+    testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU));
+
+    // CPU and memory.
+    Map<String, String> inputMemoryCPU = new HashMap<String, String>() {
+      {
+        put(KubernetesConstants.MEMORY, managerMemLimit);
+        put(KubernetesConstants.CPU, managerCpuLimit);
+      }
+    };
+    Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() {
+      {
+        put(KubernetesConstants.MEMORY, memory);
+        put(KubernetesConstants.CPU, cpu);
+      }
+    };
+    testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, expectedMemoryCPU));
+
+    // Invalid.
+    Map<String, String> inputInvalid = new HashMap<String, String>() {
+      {
+        put("invalid input", "will not be ignored");
+        put(KubernetesConstants.CPU, managerCpuLimit);
+      }
+    };
+    Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() {
+      {
+        put(KubernetesConstants.CPU, cpu);
+      }
+    };
+    testCases.add(new TestTuple<>("Invalid input.", inputInvalid, expectedInvalid));
+
+    // Test loop.
+    for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) {
+      Map<String, Quantity> actual =
+          STATEFUL_SET.createResourcesRequirement(testCase.input);
+      Assert.assertEquals(testCase.description, testCase.expected, actual);
+    }
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsEmptyDirCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String volumeName = "volume-name-empty-dir";
+    final String medium = "Memory";
+    final String sizeLimit = "1Gi";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // Empty Dir.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.sizeLimit, sizeLimit);
+            put(VolumeConfigKeys.medium, "Memory");
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewEmptyDir()
+              .withMedium(medium)
+              .withNewSizeLimit(sizeLimit)
+            .endEmptyDir()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+              .withMountPath(path)
+              .withSubPath(subPath)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    STATEFUL_SET.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
+    Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes);
+    Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts);
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsHostPathCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String volumeName = "volume-name-host-path";
+    final String type = "DirectoryOrCreate";
+    final String pathOnHost = "path.on.host";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // Host Path.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.type, type);
+            put(VolumeConfigKeys.pathOnHost, pathOnHost);
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewHostPath()
+              .withNewType(type)
+              .withNewPath(pathOnHost)
+            .endHostPath()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+              .withMountPath(path)
+              .withSubPath(subPath)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    STATEFUL_SET.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
+    Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes);
+    Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts);
+  }
+
+  @Test
+  public void testCreateVolumeAndMountsNFSCLI() {
+    // Load configurations into test class.
+    STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+    final String volumeName = "volume-name-nfs";
+    final String server = "nfs.server.address";
+    final String pathOnNFS = "path.on.host";
+    final String readOnly = "true";
+    final String path = "/path/to/mount";
+    final String subPath = "/sub/path/to/mount";
+
+    // NFS.
+    final Map<String, Map<VolumeConfigKeys, String>> config =
+        ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+          {
+            put(VolumeConfigKeys.server, server);
+            put(VolumeConfigKeys.readOnly, readOnly);
+            put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
+            put(VolumeConfigKeys.path, path);
+            put(VolumeConfigKeys.subPath, subPath);
+          }
+        });
+    final List<V1Volume> expectedVolumes = Collections.singletonList(
+        new V1VolumeBuilder()
+            .withName(volumeName)
+            .withNewNfs()
+              .withServer(server)
+              .withPath(pathOnNFS)
+              .withReadOnly(Boolean.parseBoolean(readOnly))
+            .endNfs()
+            .build()
+    );
+    final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+        new V1VolumeMountBuilder()
+            .withName(volumeName)
+            .withMountPath(path)
+            .withSubPath(subPath)
+            .withReadOnly(true)
+            .build()
+    );
+
+    List<V1Volume> actualVolumes = new LinkedList<>();
+    List<V1VolumeMount> actualMounts = new LinkedList<>();
+    STATEFUL_SET.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
+    Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes);
+    Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts);
   }
 }