You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/20 23:58:11 UTC
[incubator-heron] 03/03: [Tests] Shim to Stateful Set test migration.
This is an automated email from the ASF dual-hosted git repository.
saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
commit 23ded81598237d687be0b9301b307af96ef0c825
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 19:57:56 2022 -0400
[Tests] Shim to Stateful Set test migration.
---
.../scheduler/kubernetes/StatefulSetTest.java | 966 ++++++++++++++++++++-
1 file changed, 964 insertions(+), 2 deletions(-)
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
index 746823f21ea..10f48845b5f 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
@@ -19,12 +19,974 @@
package org.apache.heron.scheduler.kubernetes;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.ImmutableMap;
+
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.heron.common.basics.ByteAmount;
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerBuilder;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1ContainerPortBuilder;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpecBuilder;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeBuilder;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
+import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
+import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+
+@RunWith(MockitoJUnitRunner.class)
public class StatefulSetTest {
+ private static final String TOPOLOGY_NAME = "topology-name";
+ private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+ private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+ private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+ String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+ private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
+ String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+ KubernetesConstants.EXECUTOR_NAME);
+ private static final String POD_TEMPLATE_LOCATION_MANAGER =
+ String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+ KubernetesConstants.MANAGER_NAME);
+ private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+ .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+ .build();
+ private static final Config RUNTIME = Config.newBuilder()
+ .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+ .build();
+ private static final StatefulSet STATEFUL_SET = new StatefulSet();
+ private static final V1PodTemplateSpec POD_TEMPLATE_SPEC = new V1PodTemplateSpecBuilder()
+ .withNewMetadata()
+ .withLabels(Collections.singletonMap("app", "heron-tracker"))
+ .endMetadata()
+ .withNewSpec()
+ .withContainers(new V1ContainerBuilder()
+ .withName("heron-tracker")
+ .withImage("apache/heron:latest")
+ .withPorts(new V1ContainerPortBuilder()
+ .withName("api-port")
+ .withContainerPort(8888)
+ .build())
+ .withNewResources()
+ .withRequests(Collections.singletonMap("100m", new Quantity("200M")))
+ .withLimits(Collections.singletonMap("400m", new Quantity("512M")))
+ .endResources()
+ .build())
+ .endSpec()
+ .build();
+ private static final StatefulSet.Configs CLUSTER_CONFIGS =
+ new StatefulSet.Configs(CONFIG_WITH_POD_TEMPLATE, RUNTIME,
+ POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+
+
+ @Test
+ public void testConfigureContainerPorts() {
+ final String portNamekept = "random-port-to-be-kept";
+ final int portNumberkept = 1111;
+ final int numInstances = 3;
+ final List<V1ContainerPort> expectedPortsBase =
+ Collections.unmodifiableList(StatefulSet.getExecutorPorts());
+ final List<V1ContainerPort> debugPorts =
+ Collections.unmodifiableList(StatefulSet.getDebuggingPorts(numInstances));
+ final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
+ Arrays.asList(
+ new V1ContainerPort()
+ .name("server-port-to-replace").containerPort(KubernetesConstants.SERVER_PORT),
+ new V1ContainerPort()
+ .name("shell-port-to-replace").containerPort(KubernetesConstants.SHELL_PORT),
+ new V1ContainerPort().name(portNamekept).containerPort(portNumberkept)
+ )
+ );
+
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ // Null ports. This is the default case.
+ final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build();
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithNullPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container with null ports list",
+ CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase));
+
+ // Empty ports.
+ final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
+ .withPorts(new LinkedList<>())
+ .build();
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container with empty ports list",
+ CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase));
+
+ // Port overriding.
+ final List<V1ContainerPort> inputPorts = new LinkedList<>(inputPortsBase);
+ final V1Container inputContainerWithPorts = new V1ContainerBuilder()
+ .withPorts(inputPorts)
+ .build();
+ final List<V1ContainerPort> expectedPortsOverriding = new LinkedList<>(expectedPortsBase);
+ expectedPortsOverriding
+ .add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithPorts);
+ Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.",
+ CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding));
+
+ // Port overriding with debug ports.
+ final List<V1ContainerPort> inputPortsWithDebug = new LinkedList<>(debugPorts);
+ inputPortsWithDebug.addAll(inputPortsBase);
+ final V1Container inputContainerWithDebug = new V1ContainerBuilder()
+ .withPorts(inputPortsWithDebug)
+ .build();
+ final List<V1ContainerPort> expectedPortsDebug = new LinkedList<>(expectedPortsBase);
+ expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
+ expectedPortsDebug.addAll(debugPorts);
+
+ STATEFUL_SET.configureContainerPorts(true, numInstances, inputContainerWithDebug);
+ Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.",
+ CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug));
+ }
+
+ @Test
+ public void testConfigureContainerEnvVars() {
+ final List<V1EnvVar> heronEnvVars =
+ Collections.unmodifiableList(STATEFUL_SET.getExecutorEnvVars());
+ final V1EnvVar additionEnvVar = new V1EnvVar()
+ .name("env-variable-to-be-kept")
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-variable-was-kept")));
+ final List<V1EnvVar> inputEnvVars = Arrays.asList(
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_HOST)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("env-host-to-be-replaced"))),
+ new V1EnvVar()
+ .name(KubernetesConstants.ENV_POD_NAME)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath("pod-name-to-be-replaced"))),
+ additionEnvVar
+ );
+
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ // Null env vars. This is the default case.
+ V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
+ STATEFUL_SET.configureContainerEnvVars(containerWithNullEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match",
+ CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars));
+
+ // Empty env vars.
+ V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
+ .withEnv(new LinkedList<>())
+ .build();
+ STATEFUL_SET.configureContainerEnvVars(containerWithEmptyEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match",
+ CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars));
+
+ // Env Var overriding.
+ final List<V1EnvVar> expectedOverriding = new LinkedList<>(heronEnvVars);
+ expectedOverriding.add(additionEnvVar);
+ V1Container containerWithEnvVars = new V1ContainerBuilder()
+ .withEnv(inputEnvVars)
+ .build();
+ STATEFUL_SET.configureContainerEnvVars(containerWithEnvVars);
+ Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden",
+ CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding));
+ }
+
+ @Test
+ public void testConfigureContainerResources() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final boolean isExecutor = true;
+
+ final Resource resourceDefault = new Resource(
+ 9, ByteAmount.fromMegabytes(19000), ByteAmount.fromMegabytes(99000));
+ final Resource resourceCustom = new Resource(
+ 4, ByteAmount.fromMegabytes(34000), ByteAmount.fromMegabytes(400000));
+
+ final Quantity defaultRAM = Quantity.fromString(
+ KubernetesUtils.Megabytes(resourceDefault.getRam()));
+ final Quantity defaultCPU = Quantity.fromString(
+ Double.toString(KubernetesUtils.roundDecimal(resourceDefault.getCpu(), 3)));
+ final Quantity customRAM = Quantity.fromString(
+ KubernetesUtils.Megabytes(resourceCustom.getRam()));
+ final Quantity customCPU = Quantity.fromString(
+ Double.toString(KubernetesUtils.roundDecimal(resourceCustom.getCpu(), 3)));
+ final Quantity customDisk = Quantity.fromString(
+ KubernetesUtils.Megabytes(resourceCustom.getDisk()));
+
+ final Config configNoLimit = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "NOT_SET")
+ .build();
+ final Config configWithLimit = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
+ .build();
+
+ final V1ResourceRequirements expectDefaultRequirements = new V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+ .putLimitsItem(KubernetesConstants.CPU, defaultCPU);
+
+ final V1ResourceRequirements expectCustomRequirements = new V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, defaultRAM)
+ .putLimitsItem(KubernetesConstants.CPU, defaultCPU)
+ .putLimitsItem("disk", customDisk);
+
+ final V1ResourceRequirements customRequirements = new V1ResourceRequirements()
+ .putLimitsItem(KubernetesConstants.MEMORY, customRAM)
+ .putLimitsItem(KubernetesConstants.CPU, customCPU)
+ .putLimitsItem("disk", customDisk);
+
+ // Default. Null resources.
+ V1Container containerNull = new V1ContainerBuilder().build();
+ STATEFUL_SET.configureContainerResources(containerNull, configNoLimit, resourceDefault,
+ isExecutor);
+ Assert.assertTrue("Default LIMITS should be set in container with null LIMITS",
+ containerNull.getResources().getLimits().entrySet()
+ .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+ // Empty resources.
+ V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build();
+ STATEFUL_SET.configureContainerResources(containerEmpty, configNoLimit, resourceDefault,
+ isExecutor);
+ Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS",
+ containerNull.getResources().getLimits().entrySet()
+ .containsAll(expectDefaultRequirements.getLimits().entrySet()));
+
+ // Custom resources.
+ V1Container containerCustom = new V1ContainerBuilder()
+ .withResources(customRequirements)
+ .build();
+ STATEFUL_SET.configureContainerResources(containerCustom, configNoLimit, resourceDefault,
+ isExecutor);
+ Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS",
+ containerCustom.getResources().getLimits().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+
+ // Custom resources with request.
+ V1Container containerRequests = new V1ContainerBuilder()
+ .withResources(customRequirements)
+ .build();
+ STATEFUL_SET.configureContainerResources(containerRequests, configWithLimit, resourceDefault,
+ isExecutor);
+ Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST",
+ containerRequests.getResources().getLimits().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+ Assert.assertTrue("Custom REQUEST should be set in container with custom LIMITS and REQUEST",
+ containerRequests.getResources().getRequests().entrySet()
+ .containsAll(expectCustomRequirements.getLimits().entrySet()));
+ }
+
+ @Test
+ public void testConfigureContainerResourcesCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final boolean isExecutor = true;
+ final String customLimitMEMStr = "120Gi";
+ final String customLimitCPUStr = "5";
+ final String customRequestMEMStr = "100Mi";
+ final String customRequestCPUStr = "4";
+
+ final Resource resources = new Resource(
+ 6, ByteAmount.fromMegabytes(34000), ByteAmount.fromGigabytes(400));
+
+ final Quantity customLimitMEM = Quantity.fromString(customLimitMEMStr);
+ final Quantity customLimitCPU = Quantity.fromString(customLimitCPUStr);
+ final Quantity customRequestMEM = Quantity.fromString(customRequestMEMStr);
+ final Quantity customRequestCPU = Quantity.fromString(customRequestCPUStr);
+
+ final Config config = Config.newBuilder()
+ .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
+ + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customLimitCPUStr)
+ .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_LIMITS_PREFIX
+ + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME), customLimitMEMStr)
+ .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
+ + KubernetesConstants.CPU, KubernetesConstants.EXECUTOR_NAME), customRequestCPUStr)
+ .put(String.format(KubernetesContext.KUBERNETES_RESOURCE_REQUESTS_PREFIX
+ + KubernetesConstants.MEMORY, KubernetesConstants.EXECUTOR_NAME),
+ customRequestMEMStr)
+ .put(KubernetesContext.KUBERNETES_RESOURCE_REQUEST_MODE, "EQUAL_TO_LIMIT")
+ .build();
+
+ final V1Container expected = new V1ContainerBuilder()
+ .withNewResources()
+ .addToLimits(KubernetesConstants.CPU, customLimitCPU)
+ .addToLimits(KubernetesConstants.MEMORY, customLimitMEM)
+ .addToRequests(KubernetesConstants.CPU, customRequestCPU)
+ .addToRequests(KubernetesConstants.MEMORY, customRequestMEM)
+ .endResources()
+ .build();
+
+ final V1Container actual = new V1Container();
+ STATEFUL_SET.configureContainerResources(actual, config, resources, isExecutor);
+ Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
+ }
+
+ @Test
+ public void testMountVolumeIfPresent() {
+ final String pathDefault = "config-host-volume-path";
+ final String pathNameDefault = "config-host-volume-name";
+ final Config configWithVolumes = Config.newBuilder()
+ .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault)
+ .put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault)
+ .build();
+ final StatefulSet.Configs configsWithVolumes = new StatefulSet.Configs(configWithVolumes,
+ RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+ final StatefulSet.Configs configsWithNoVolumes = new StatefulSet.Configs(
+ Config.newBuilder().build(), RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+ final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
+ .withName(pathNameDefault)
+ .withMountPath(pathDefault)
+ .build();
+ final V1VolumeMount volumeCustom = new V1VolumeMountBuilder()
+ .withName("custom-volume-mount")
+ .withMountPath("should-be-kept")
+ .build();
+
+ final List<V1VolumeMount> expectedMountsDefault = Collections.singletonList(volumeDefault);
+ final List<V1VolumeMount> expectedMountsCustom = Arrays.asList(volumeCustom, volumeDefault);
+ final List<V1VolumeMount> volumeMountsCustomList = Arrays.asList(
+ new V1VolumeMountBuilder()
+ .withName(pathNameDefault)
+ .withMountPath("should-be-replaced")
+ .build(),
+ volumeCustom
+ );
+
+ // No Volume Mounts set.
+ STATEFUL_SET.setClusterConfigs(configsWithNoVolumes);
+ V1Container containerNoSetMounts = new V1Container();
+ STATEFUL_SET.mountVolumeIfPresent(containerNoSetMounts);
+ Assert.assertNull(containerNoSetMounts.getVolumeMounts());
+
+ // Configure factory for volume mounts.
+ STATEFUL_SET.setClusterConfigs(configsWithVolumes);
+
+ // Default. Null Volume Mounts.
+ V1Container containerNull = new V1ContainerBuilder().build();
+ STATEFUL_SET.mountVolumeIfPresent(containerNull);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts()));
+
+ // Empty Volume Mounts.
+ V1Container containerEmpty = new V1ContainerBuilder()
+ .withVolumeMounts(new LinkedList<>())
+ .build();
+ STATEFUL_SET.mountVolumeIfPresent(containerEmpty);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts()));
+
+ // Custom Volume Mounts.
+ V1Container containerCustom = new V1ContainerBuilder()
+ .withVolumeMounts(volumeMountsCustomList)
+ .build();
+ STATEFUL_SET.mountVolumeIfPresent(containerCustom);
+ Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS",
+ CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts()));
+ }
+
+ @Test
+ public void testConfigureTolerations() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final V1Toleration keptToleration = new V1Toleration()
+ .key("kept toleration")
+ .operator("Some Operator")
+ .effect("Some Effect")
+ .tolerationSeconds(5L);
+ final List<V1Toleration> expectedTolerationBase =
+ Collections.unmodifiableList(KubernetesShim.getTolerations());
+ final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
+ Arrays.asList(
+ new V1Toleration()
+ .key(KubernetesConstants.TOLERATIONS.get(0)).operator("replace").effect("replace"),
+ new V1Toleration()
+ .key(KubernetesConstants.TOLERATIONS.get(1)).operator("replace").effect("replace"),
+ keptToleration
+ )
+ );
+
+ // Null Tolerations. This is the default case.
+ final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
+ STATEFUL_SET.configureTolerations(podSpecNullTolerations);
+ Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults",
+ CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
+ expectedTolerationBase));
+
+ // Empty Tolerations.
+ final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
+ .withTolerations(new LinkedList<>())
+ .build();
+ STATEFUL_SET.configureTolerations(podSpecWithEmptyTolerations);
+ Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults",
+ CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
+ expectedTolerationBase));
+
+ // Toleration overriding.
+ final V1PodSpec podSpecWithTolerations = new V1PodSpecBuilder()
+ .withTolerations(inputTolerationsBase)
+ .build();
+ final List<V1Toleration> expectedTolerationsOverriding =
+ new LinkedList<>(expectedTolerationBase);
+ expectedTolerationsOverriding.add(keptToleration);
+
+ STATEFUL_SET.configureTolerations(podSpecWithTolerations);
+ Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults",
+ CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
+ expectedTolerationsOverriding));
+ }
+
+ @Test
+ public void testCreatePersistentVolumeClaims() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String topologyName = "topology-name";
+ final String volumeNameOne = "volume-name-one";
+ final String volumeNameTwo = "volume-name-two";
+ final String volumeNameStatic = "volume-name-static";
+ final String claimNameOne = "OnDemand";
+ final String claimNameTwo = "claim-name-two";
+ final String claimNameStatic = "OnDEmaND";
+ final String storageClassName = "storage-class-name";
+ final String sizeLimit = "555Gi";
+ final String accessModesList = "ReadWriteOnce,ReadOnlyMany,ReadWriteMany";
+ final String accessModes = "ReadOnlyMany";
+ final String volumeMode = "VolumeMode";
+ final String path = "/path/to/mount/";
+ final String subPath = "/sub/path/to/mount/";
+ final Map<String, Map<VolumeConfigKeys, String>> mapPVCOpts =
+ ImmutableMap.of(
+ volumeNameOne, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.claimName, claimNameOne);
+ put(VolumeConfigKeys.storageClassName, storageClassName);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModesList);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
+ }
+ },
+ volumeNameTwo, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.claimName, claimNameTwo);
+ put(VolumeConfigKeys.storageClassName, storageClassName);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModes);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ },
+ volumeNameStatic, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.claimName, claimNameStatic);
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.accessModes, accessModes);
+ put(VolumeConfigKeys.volumeMode, volumeMode);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ }
+ );
+
+ final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(volumeNameOne)
+ .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+ .endMetadata()
+ .withNewSpec()
+ .withStorageClassName(storageClassName)
+ .withAccessModes(Arrays.asList(accessModesList.split(",")))
+ .withVolumeMode(volumeMode)
+ .withNewResources()
+ .addToRequests("storage", new Quantity(sizeLimit))
+ .endResources()
+ .endSpec()
+ .build();
+
+ final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
+ .withNewMetadata()
+ .withName(volumeNameStatic)
+ .withLabels(KubernetesShim.getPersistentVolumeClaimLabels(topologyName))
+ .endMetadata()
+ .withNewSpec()
+ .withStorageClassName("")
+ .withAccessModes(Collections.singletonList(accessModes))
+ .withVolumeMode(volumeMode)
+ .withNewResources()
+ .addToRequests("storage", new Quantity(sizeLimit))
+ .endResources()
+ .endSpec()
+ .build();
+
+ final List<V1PersistentVolumeClaim> expectedClaims =
+ new LinkedList<>(Arrays.asList(claimOne, claimStatic));
+
+ final List<V1PersistentVolumeClaim> actualClaims =
+ STATEFUL_SET.createPersistentVolumeClaims(mapPVCOpts);
+
+ Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
+ Assert.assertTrue(expectedClaims.containsAll(actualClaims));
+ }
+
+ @Test
+ public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String volumeNameOne = "VolumeNameONE";
+ final String volumeNameTwo = "VolumeNameTWO";
+ final String claimNameOne = "claim-name-one";
+ final String claimNameTwo = "OnDemand";
+ final String mountPathOne = "/mount/path/ONE";
+ final String mountPathTwo = "/mount/path/TWO";
+ final String mountSubPathTwo = "/mount/sub/path/TWO";
+ Map<String, Map<VolumeConfigKeys, String>> mapOfOpts =
+ ImmutableMap.of(
+ volumeNameOne, ImmutableMap.of(
+ VolumeConfigKeys.claimName, claimNameOne,
+ VolumeConfigKeys.path, mountPathOne),
+ volumeNameTwo, ImmutableMap.of(
+ VolumeConfigKeys.claimName, claimNameTwo,
+ VolumeConfigKeys.path, mountPathTwo,
+ VolumeConfigKeys.subPath, mountSubPathTwo)
+ );
+ final V1Volume volumeOne = new V1VolumeBuilder()
+ .withName(volumeNameOne)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimNameOne)
+ .endPersistentVolumeClaim()
+ .build();
+ final V1Volume volumeTwo = new V1VolumeBuilder()
+ .withName(volumeNameTwo)
+ .withNewPersistentVolumeClaim()
+ .withClaimName(claimNameTwo)
+ .endPersistentVolumeClaim()
+ .build();
+ final V1VolumeMount volumeMountOne = new V1VolumeMountBuilder()
+ .withName(volumeNameOne)
+ .withMountPath(mountPathOne)
+ .build();
+ final V1VolumeMount volumeMountTwo = new V1VolumeMountBuilder()
+ .withName(volumeNameTwo)
+ .withMountPath(mountPathTwo)
+ .withSubPath(mountSubPathTwo)
+ .build();
+
+ // Test case container.
+ // Input: Map of Volume configurations.
+ // Output: The expected lists of Volumes and Volume Mounts.
+ final List<KubernetesUtils.TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+ Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
+
+ // Default case: No PVC provided.
+ testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(),
+ new Pair<>(new LinkedList<>(), new LinkedList<>())));
+
+ // PVC Provided.
+ final Pair<List<V1Volume>, List<V1VolumeMount>> expectedFull =
+ new Pair<>(
+ new LinkedList<>(Arrays.asList(volumeOne, volumeTwo)),
+ new LinkedList<>(Arrays.asList(volumeMountOne, volumeMountTwo)));
+ testCases.add(new TestTuple<>("Generated a list of Volumes", mapOfOpts,
+ new Pair<>(expectedFull.first, expectedFull.second)));
+
+ // Testing loop.
+ for (TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+ Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
+ List<V1Volume> actualVolume = new LinkedList<>();
+ List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
+ STATEFUL_SET.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+ actualVolume, actualVolumeMount);
+
+ Assert.assertTrue(testCase.description,
+ (testCase.expected.first).containsAll(actualVolume));
+ Assert.assertTrue(testCase.description + " Mounts",
+ (testCase.expected.second).containsAll(actualVolumeMount));
+ }
+ }
+
+ @Test
+ public void testConfigurePodWithVolumesAndMountsFromCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String volumeNameClashing = "clashing-volume";
+ final String volumeMountNameClashing = "original-volume-mount";
+ V1Volume baseVolume = new V1VolumeBuilder()
+ .withName(volumeNameClashing)
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Original Base Claim Name")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount baseVolumeMount = new V1VolumeMountBuilder()
+ .withName(volumeMountNameClashing)
+ .withMountPath("/original/mount/path")
+ .build();
+ V1Volume clashingVolume = new V1VolumeBuilder()
+ .withName(volumeNameClashing)
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Clashing Claim Replaced")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount clashingVolumeMount = new V1VolumeMountBuilder()
+ .withName(volumeMountNameClashing)
+ .withMountPath("/clashing/mount/path")
+ .build();
+ V1Volume secondaryVolume = new V1VolumeBuilder()
+ .withName("secondary-volume")
+ .withNewPersistentVolumeClaim()
+ .withClaimName("Original Secondary Claim Name")
+ .endPersistentVolumeClaim()
+ .build();
+ V1VolumeMount secondaryVolumeMount = new V1VolumeMountBuilder()
+ .withName("secondary-volume-mount")
+ .withMountPath("/secondary/mount/path")
+ .build();
+
+ // Test case container.
+ // Input: [0] Pod Spec to modify, [1] Heron container to modify, [2] List of Volumes
+ // [3] List of Volume Mounts.
+ // Output: The expected <V1PodSpec> and <V1Container>.
+ final List<TestTuple<Object[], Pair<V1PodSpec, V1Container>>> testCases = new LinkedList<>();
+
+ // No Persistent Volume Claim.
+ final V1PodSpec podSpecEmptyCase = new V1PodSpecBuilder().withVolumes(baseVolume).build();
+ final V1Container executorEmptyCase =
+ new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+ final V1PodSpec expectedEmptyPodSpec = new V1PodSpecBuilder().withVolumes(baseVolume).build();
+ final V1Container expectedEmptyExecutor =
+ new V1ContainerBuilder().withVolumeMounts(baseVolumeMount).build();
+
+ testCases.add(new TestTuple<>("Empty",
+ new Object[]{podSpecEmptyCase, executorEmptyCase, new LinkedList<>(), new LinkedList<>()},
+ new Pair<>(expectedEmptyPodSpec, expectedEmptyExecutor)));
+
+ // Non-clashing Persistent Volume Claim.
+ final V1PodSpec podSpecNoClashCase = new V1PodSpecBuilder()
+ .withVolumes(baseVolume)
+ .build();
+ final V1Container executorNoClashCase = new V1ContainerBuilder()
+ .withVolumeMounts(baseVolumeMount)
+ .build();
+ final V1PodSpec expectedNoClashPodSpec = new V1PodSpecBuilder()
+ .addToVolumes(baseVolume)
+ .addToVolumes(secondaryVolume)
+ .build();
+ final V1Container expectedNoClashExecutor = new V1ContainerBuilder()
+ .addToVolumeMounts(baseVolumeMount)
+ .addToVolumeMounts(secondaryVolumeMount)
+ .build();
+
+ testCases.add(new TestTuple<>("No Clash",
+ new Object[]{podSpecNoClashCase, executorNoClashCase,
+ Collections.singletonList(secondaryVolume),
+ Collections.singletonList(secondaryVolumeMount)},
+ new Pair<>(expectedNoClashPodSpec, expectedNoClashExecutor)));
+
+ // Clashing Persistent Volume Claim.
+ final V1PodSpec podSpecClashCase = new V1PodSpecBuilder()
+ .withVolumes(baseVolume)
+ .build();
+ final V1Container executorClashCase = new V1ContainerBuilder()
+ .withVolumeMounts(baseVolumeMount)
+ .build();
+ final V1PodSpec expectedClashPodSpec = new V1PodSpecBuilder()
+ .addToVolumes(clashingVolume)
+ .addToVolumes(secondaryVolume)
+ .build();
+ final V1Container expectedClashExecutor = new V1ContainerBuilder()
+ .addToVolumeMounts(clashingVolumeMount)
+ .addToVolumeMounts(secondaryVolumeMount)
+ .build();
+
+ testCases.add(new TestTuple<>("Clashing",
+ new Object[]{podSpecClashCase, executorClashCase,
+ Arrays.asList(clashingVolume, secondaryVolume),
+ Arrays.asList(clashingVolumeMount, secondaryVolumeMount)},
+ new Pair<>(expectedClashPodSpec, expectedClashExecutor)));
+
+ // Testing loop.
+ for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) {
+ STATEFUL_SET
+ .configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0],
+ (V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2],
+ (List<V1VolumeMount>) testCase.input[3]);
+
+ Assert.assertEquals("Pod Specs match " + testCase.description,
+ testCase.input[0], testCase.expected.first);
+ Assert.assertEquals("Executors match " + testCase.description,
+ testCase.input[1], testCase.expected.second);
+ }
+ }
+
@Test
- public void testEmpty() {
- Assert.assertTrue(true);
+ public void testSetShardIdEnvironmentVariableCommand() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+
+ testCases.add(new TestTuple<>("Executor command is set correctly",
+ true, "SHARD_ID=$((${POD_NAME##*-} + 1)) && echo shardId=${SHARD_ID}"));
+ testCases.add(new TestTuple<>("Manager command is set correctly",
+ false, "SHARD_ID=${POD_NAME##*-} && echo shardId=${SHARD_ID}"));
+
+ for (TestTuple<Boolean, String> testCase : testCases) {
+ Assert.assertEquals(testCase.description, testCase.expected,
+ STATEFUL_SET.setShardIdEnvironmentVariableCommand(testCase.input));
+ }
+ }
+
+ @Test
+ public void testCreateResourcesRequirement() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String managerCpuLimit = "3000m";
+ final String managerMemLimit = "256Gi";
+ final Quantity memory = Quantity.fromString(managerMemLimit);
+ final Quantity cpu = Quantity.fromString(managerCpuLimit);
+ final List<TestTuple<Map<String, String>, Map<String, Quantity>>> testCases =
+ new LinkedList<>();
+
+ // No input.
+ Map<String, String> inputEmpty = new HashMap<>();
+ testCases.add(new TestTuple<>("Empty input.", inputEmpty, new HashMap<>()));
+
+ // Only memory.
+ Map<String, String> inputMemory = new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.MEMORY, managerMemLimit);
+ }
+ };
+ Map<String, Quantity> expectedMemory = new HashMap<String, Quantity>() {
+ {
+ put(KubernetesConstants.MEMORY, memory);
+ }
+ };
+ testCases.add(new TestTuple<>("Only memory input.", inputMemory, expectedMemory));
+
+ // Only CPU.
+ Map<String, String> inputCPU = new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.CPU, managerCpuLimit);
+ }
+ };
+ Map<String, Quantity> expectedCPU = new HashMap<String, Quantity>() {
+ {
+ put(KubernetesConstants.CPU, cpu);
+ }
+ };
+ testCases.add(new TestTuple<>("Only CPU input.", inputCPU, expectedCPU));
+
+ // CPU and memory.
+ Map<String, String> inputMemoryCPU = new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.MEMORY, managerMemLimit);
+ put(KubernetesConstants.CPU, managerCpuLimit);
+ }
+ };
+ Map<String, Quantity> expectedMemoryCPU = new HashMap<String, Quantity>() {
+ {
+ put(KubernetesConstants.MEMORY, memory);
+ put(KubernetesConstants.CPU, cpu);
+ }
+ };
+ testCases.add(new TestTuple<>("Memory and CPU input.", inputMemoryCPU, expectedMemoryCPU));
+
+ // Invalid.
+ Map<String, String> inputInvalid = new HashMap<String, String>() {
+ {
+ put("invalid input", "will not be ignored");
+ put(KubernetesConstants.CPU, managerCpuLimit);
+ }
+ };
+ Map<String, Quantity> expectedInvalid = new HashMap<String, Quantity>() {
+ {
+ put(KubernetesConstants.CPU, cpu);
+ }
+ };
+ testCases.add(new TestTuple<>("Invalid input.", inputInvalid, expectedInvalid));
+
+ // Test loop.
+ for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) {
+ Map<String, Quantity> actual =
+ STATEFUL_SET.createResourcesRequirement(testCase.input);
+ Assert.assertEquals(testCase.description, testCase.expected, actual);
+ }
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsEmptyDirCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String volumeName = "volume-name-empty-dir";
+ final String medium = "Memory";
+ final String sizeLimit = "1Gi";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // Empty Dir.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.sizeLimit, sizeLimit);
+ put(VolumeConfigKeys.medium, "Memory");
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewEmptyDir()
+ .withMedium(medium)
+ .withNewSizeLimit(sizeLimit)
+ .endEmptyDir()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ STATEFUL_SET.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
+ Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes);
+ Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts);
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsHostPathCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String volumeName = "volume-name-host-path";
+ final String type = "DirectoryOrCreate";
+ final String pathOnHost = "path.on.host";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // Host Path.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.type, type);
+ put(VolumeConfigKeys.pathOnHost, pathOnHost);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewHostPath()
+ .withNewType(type)
+ .withNewPath(pathOnHost)
+ .endHostPath()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ STATEFUL_SET.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
+ Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes);
+ Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts);
+ }
+
+ @Test
+ public void testCreateVolumeAndMountsNFSCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
+ final String volumeName = "volume-name-nfs";
+ final String server = "nfs.server.address";
+ final String pathOnNFS = "path.on.host";
+ final String readOnly = "true";
+ final String path = "/path/to/mount";
+ final String subPath = "/sub/path/to/mount";
+
+ // NFS.
+ final Map<String, Map<VolumeConfigKeys, String>> config =
+ ImmutableMap.of(volumeName, new HashMap<VolumeConfigKeys, String>() {
+ {
+ put(VolumeConfigKeys.server, server);
+ put(VolumeConfigKeys.readOnly, readOnly);
+ put(VolumeConfigKeys.pathOnNFS, pathOnNFS);
+ put(VolumeConfigKeys.path, path);
+ put(VolumeConfigKeys.subPath, subPath);
+ }
+ });
+ final List<V1Volume> expectedVolumes = Collections.singletonList(
+ new V1VolumeBuilder()
+ .withName(volumeName)
+ .withNewNfs()
+ .withServer(server)
+ .withPath(pathOnNFS)
+ .withReadOnly(Boolean.parseBoolean(readOnly))
+ .endNfs()
+ .build()
+ );
+ final List<V1VolumeMount> expectedMounts = Collections.singletonList(
+ new V1VolumeMountBuilder()
+ .withName(volumeName)
+ .withMountPath(path)
+ .withSubPath(subPath)
+ .withReadOnly(true)
+ .build()
+ );
+
+ List<V1Volume> actualVolumes = new LinkedList<>();
+ List<V1VolumeMount> actualMounts = new LinkedList<>();
+ STATEFUL_SET.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
+ Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes);
+ Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts);
}
}