You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by ni...@apache.org on 2022/07/24 03:21:08 UTC
[incubator-heron] branch master updated: [3846] Refactoring of the Kubernetes shim (#3847)
This is an automated email from the ASF dual-hosted git repository.
nicknezis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/master by this push:
new 41e098862f5 [3846] Refactoring of the Kubernetes shim (#3847)
41e098862f5 is described below
commit 41e098862f5d10a39738fe5e1139649d1c47c151
Author: Saad Ur Rahman <su...@users.noreply.github.com>
AuthorDate: Sat Jul 23 23:21:01 2022 -0400
[3846] Refactoring of the Kubernetes shim (#3847)
---
.../scheduler/kubernetes/KubernetesContext.java | 2 +-
.../scheduler/kubernetes/KubernetesScheduler.java | 2 +-
.../heron/scheduler/kubernetes/KubernetesShim.java | 609 ++++++++++++++++++++
.../scheduler/kubernetes/KubernetesUtils.java | 4 +-
.../{V1Controller.java => StatefulSet.java} | 628 ++++-----------------
heron/schedulers/tests/java/BUILD | 3 +-
.../scheduler/kubernetes/KubernetesShimTest.java | 419 ++++++++++++++
.../scheduler/kubernetes/KubernetesUtilsTest.java | 22 +-
...{V1ControllerTest.java => StatefulSetTest.java} | 515 ++++-------------
.../heron/scheduler/kubernetes/VolumesTests.java | 2 +-
10 files changed, 1272 insertions(+), 934 deletions(-)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
index 8d9a8795070..3d424ce5a89 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesContext.java
@@ -219,7 +219,7 @@ public final class KubernetesContext extends Context {
@VisibleForTesting
protected static Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>>
getVolumeConfigs(final Config config, final String prefix, final boolean isExecutor) {
- final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+ final Logger LOG = Logger.getLogger(KubernetesShim.class.getName());
final String prefixKey = String.format(prefix,
isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
index c35c87df7d5..abc35c77397 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesScheduler.java
@@ -50,7 +50,7 @@ public class KubernetesScheduler implements IScheduler, IScalable {
private UpdateTopologyManager updateTopologyManager;
protected KubernetesController getController() {
- return new V1Controller(configuration, runtimeConfiguration);
+ return new KubernetesShim(configuration, runtimeConfiguration);
}
@Override
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
new file mode 100644
index 00000000000..f3541569986
--- /dev/null
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -0,0 +1,609 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologyRuntimeManagementException;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.packing.PackingPlan;
+import org.apache.heron.spi.packing.Resource;
+
+import io.kubernetes.client.custom.V1Patch;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.Configuration;
+import io.kubernetes.client.openapi.apis.AppsV1Api;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PodTemplate;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
+import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Status;
+import io.kubernetes.client.util.PatchUtils;
+import io.kubernetes.client.util.Yaml;
+import okhttp3.Response;
+
+import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+
+public class KubernetesShim extends KubernetesController {
+
+ private static final Logger LOG =
+ Logger.getLogger(KubernetesShim.class.getName());
+
+ private final boolean isPodTemplateDisabled;
+
+ private final AppsV1Api appsClient;
+ private final CoreV1Api coreClient;
+
+ /**
+ * Configures the Kubernetes API Application and Core communications clients.
+ * @param configuration <code>topology</code> configurations.
+ * @param runtimeConfiguration Kubernetes runtime configurations.
+ */
+ KubernetesShim(Config configuration, Config runtimeConfiguration) {
+ super(configuration, runtimeConfiguration);
+
+ isPodTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
+ LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
+ isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
+ LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
+ KubernetesContext.getVolumesFromCLIDisabled(configuration) ? "DISABLED" : "ENABLED"));
+
+ try {
+ final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient();
+ Configuration.setDefaultApiClient(apiClient);
+ appsClient = new AppsV1Api(apiClient);
+ coreClient = new CoreV1Api(apiClient);
+ } catch (IOException e) {
+ LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e);
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Configures all components required by a <code>topology</code> and submits it to the Kubernetes scheduler.
+ * @param packingPlan Used to configure the StatefulSets <code>Resource</code>s and replica count.
+ * @return Success indicator.
+ */
+ @Override
+ boolean submit(PackingPlan packingPlan) {
+ final String topologyName = getTopologyName();
+ if (!topologyName.equals(topologyName.toLowerCase())) {
+ throw new TopologySubmissionException("K8S scheduler does not allow upper case topology's.");
+ }
+
+ final Resource containerResource = getContainerResource(packingPlan);
+
+ final V1Service topologyService = createTopologyService();
+ try {
+ coreClient.createNamespacedService(getNamespace(), topologyService, null,
+ null, null);
+ } catch (ApiException e) {
+ KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
+ throw new TopologySubmissionException(e.getMessage());
+ }
+
+ // Find the max number of instances in a container so that we can open
+ // enough ports if remote debugging is enabled.
+ int numberOfInstances = 0;
+ for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
+ numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
+ }
+
+ final StatefulSet.Configs clusterConfigs = new StatefulSet.Configs(
+ getConfiguration(),
+ getRuntimeConfiguration(),
+ loadPodFromTemplate(false),
+ loadPodFromTemplate(true)
+ );
+
+ final V1StatefulSet executors = StatefulSet.get()
+ .create(StatefulSet.Type.Executor, clusterConfigs, containerResource, numberOfInstances);
+ final V1StatefulSet manager = StatefulSet.get()
+ .create(StatefulSet.Type.Manager, clusterConfigs, containerResource, numberOfInstances);
+
+ try {
+ appsClient.createNamespacedStatefulSet(getNamespace(), executors, null,
+ null, null);
+ appsClient.createNamespacedStatefulSet(getNamespace(), manager, null,
+ null, null);
+ } catch (ApiException e) {
+ final String message = String.format("Error creating topology: %s%n", e.getResponseBody());
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+
+ return true;
+ }
+
+ /**
+ * Shuts down a <code>topology</code> by deleting all associated resources.
+ * <ul>
+ * <li><code>Persistent Volume Claims</code> added by the <code>topology</code>.</li>
+ * <li><code>StatefulSet</code> for both the <code>Executors</code> and <code>Manager</code>.</li>
+ * <li>Headless <code>Service</code> which facilitates communication between all Pods.</li>
+ * </ul>
+ * @return Success indicator.
+ */
+ @Override
+ boolean killTopology() {
+ removePersistentVolumeClaims();
+ deleteStatefulSets();
+ deleteService();
+ return true;
+ }
+
+ /**
+ * Restarts a topology by deleting the Pods associated with it using <code>Selector Labels</code>.
+ * @param shardId Not used but required because of interface.
+ * @return Indicator of successful submission of restart request to Kubernetes cluster.
+ */
+ @Override
+ boolean restart(int shardId) {
+ try {
+ coreClient.deleteCollectionNamespacedPod(getNamespace(), null, null, null, null, 0,
+ createTopologySelectorLabels(), null, null, null, null, null, null, null);
+ LOG.log(Level.WARNING, String.format("Restarting topology '%s'...", getTopologyName()));
+ } catch (ApiException e) {
+ LOG.log(Level.SEVERE, String.format("Failed to restart topology '%s'...", getTopologyName()));
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Adds a specified number of Pods to a <code>topology</code>'s <code>Executors</code>.
+ * @param containersToAdd Set of containers to be added.
+ * @return The passed in <code>Packing Plan</code>.
+ */
+ @Override
+ public Set<PackingPlan.ContainerPlan>
+ addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
+ final V1StatefulSet statefulSet;
+ try {
+ statefulSet = getStatefulSet();
+ } catch (ApiException ae) {
+ final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
+ throw new TopologyRuntimeManagementException(message, ae);
+ }
+ final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+ final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
+ final int newContainerCount = currentContainerCount + containersToAdd.size();
+
+ try {
+ patchStatefulSetReplicas(newContainerCount);
+ } catch (ApiException ae) {
+ throw new TopologyRuntimeManagementException(
+ ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
+ }
+
+ return containersToAdd;
+ }
+
+ /**
+ * Removes a specified number of Pods from a <code>topology</code>'s <code>Executors</code>.
+ * @param containersToRemove Set of containers to be removed.
+ */
+ @Override
+ public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
+ final V1StatefulSet statefulSet;
+ try {
+ statefulSet = getStatefulSet();
+ } catch (ApiException ae) {
+ final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
+ throw new TopologyRuntimeManagementException(message, ae);
+ }
+
+ final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
+ final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
+ final int newContainerCount = currentContainerCount - containersToRemove.size();
+
+ try {
+ patchStatefulSetReplicas(newContainerCount);
+ } catch (ApiException e) {
+ throw new TopologyRuntimeManagementException(
+ e.getMessage() + "\ndetails\n" + e.getResponseBody());
+ }
+ }
+
+ /**
+ * Performs an in-place update of the replica count for a <code>topology</code>. This allows the
+ * <code>topology</code> Pod count to be scaled up or down.
+ * @param replicas The new number of Pod replicas required.
+ * @throws ApiException in the event there is a failure patching the StatefulSet.
+ */
+ private void patchStatefulSetReplicas(int replicas) throws ApiException {
+ final String body =
+ String.format(KubernetesConstants.JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
+ replicas);
+ final V1Patch patch = new V1Patch(body);
+
+ PatchUtils.patch(V1StatefulSet.class,
+ () ->
+ appsClient.patchNamespacedStatefulSetCall(
+ getStatefulSetName(true),
+ getNamespace(),
+ patch,
+ null,
+ null,
+ null,
+ null,
+ null),
+ V1Patch.PATCH_FORMAT_JSON_PATCH,
+ appsClient.getApiClient());
+ }
+
+ /**
+ * Retrieves the <code>Executors</code> StatefulSet configurations for the Kubernetes cluster.
+ * @return <code>Executors</code> StatefulSet configurations.
+ * @throws ApiException in the event there is a failure retrieving the StatefulSet.
+ */
+ V1StatefulSet getStatefulSet() throws ApiException {
+ return appsClient.readNamespacedStatefulSet(getStatefulSetName(true), getNamespace(),
+ null);
+ }
+
+ /**
+ * Deletes the headless <code>Service</code> for a <code>topology</code>'s <code>Executors</code>
+ * and <code>Manager</code> using the <code>topology</code>'s name.
+ */
+ void deleteService() {
+ try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
+ getNamespace(), null, null, 0, null,
+ KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
+
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
+ + getTopologyName());
+ return;
+ }
+ LOG.log(Level.SEVERE,
+ String.format("Error when deleting the Service of the job [%s] in namespace [%s]",
+ getTopologyName(), getNamespace()));
+ LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
+ KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+
+ throw new TopologyRuntimeManagementException(
+ KubernetesUtils.errorMessageFromResponse(response));
+ }
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException(
+ String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException(
+ String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
+ }
+ LOG.log(Level.INFO,
+ String.format("Headless Service for the Job [%s] in namespace [%s] is deleted.",
+ getTopologyName(), getNamespace()));
+ }
+
+ /**
+ * Deletes the StatefulSets for a <code>topology</code>'s <code>Executors</code> and <code>Manager</code>
+ * using <code>Label</code>s.
+ */
+ void deleteStatefulSets() {
+ try (Response response = appsClient.deleteCollectionNamespacedStatefulSetCall(getNamespace(),
+ null, null, null, null, null, createTopologySelectorLabels(), null, null, null, null, null,
+ null, null, null)
+ .execute()) {
+
+ if (!response.isSuccessful()) {
+ if (response.code() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSets for Topology: "
+ + getTopologyName());
+ return;
+ }
+ LOG.log(Level.SEVERE,
+ String.format("Error when deleting the StatefulSets of the job [%s] in namespace [%s]",
+ getTopologyName(), getNamespace()));
+ LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
+ KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+
+ throw new TopologyRuntimeManagementException(
+ KubernetesUtils.errorMessageFromResponse(response));
+ }
+ } catch (ApiException e) {
+ if (e.getCode() == HTTP_NOT_FOUND) {
+ LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
+ + getTopologyName());
+ return;
+ }
+ throw new TopologyRuntimeManagementException(
+ String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+ e);
+ } catch (IOException e) {
+ throw new TopologyRuntimeManagementException(
+ String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+ e);
+ }
+ LOG.log(Level.INFO,
+ String.format("StatefulSet for the Job [%s] in namespace [%s] is deleted.",
+ getTopologyName(), getNamespace()));
+ }
+
+ /**
+ * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
+ * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
+ */
+ private V1Service createTopologyService() {
+ final String topologyName = getTopologyName();
+
+ final V1Service service = new V1Service();
+
+ // Setup service metadata.
+ final V1ObjectMeta objectMeta = new V1ObjectMeta()
+ .name(topologyName)
+ .annotations(getServiceAnnotations())
+ .labels(getServiceLabels());
+ service.setMetadata(objectMeta);
+
+ // Create the headless service.
+ final V1ServiceSpec serviceSpec = new V1ServiceSpec()
+ .clusterIP("None")
+ .selector(getPodMatchLabels(topologyName));
+
+ service.setSpec(serviceSpec);
+
+ return service;
+ }
+
+ /**
+ * Extracts <code>Service Annotations</code> for configurations.
+ * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
+ */
+ private Map<String, String> getServiceAnnotations() {
+ return KubernetesContext.getServiceAnnotations(getConfiguration());
+ }
+
+ /**
+ * Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s.
+ * @param topologyName Name of the <code>topology</code>.
+ * @return Key-value pairs of <code>Match Label</code>s to be added to the Pod.
+ */
+ private Map<String, String> getPodMatchLabels(String topologyName) {
+ final Map<String, String> labels = new HashMap<>();
+ labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
+ labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ return labels;
+ }
+
+ /**
+ * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
+ * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
+ */
+ private Map<String, String> getServiceLabels() {
+ return KubernetesContext.getServiceLabels(getConfiguration());
+ }
+
+ /**
+ * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
+ * The loaded text is then parsed into a usable <code>Pod Template</code>.
+ * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+ * or <code>Manager</code>.
+ * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
+ */
+ @VisibleForTesting
+ protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
+ final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
+
+ // Default Pod Template.
+ if (podTemplateConfigMapName == null) {
+ LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
+ return new V1PodTemplateSpec();
+ }
+
+ if (isPodTemplateDisabled) {
+ throw new TopologySubmissionException("Custom Pod Templates are disabled");
+ }
+
+ final String configMapName = podTemplateConfigMapName.first;
+ final String podTemplateName = podTemplateConfigMapName.second;
+
+ // Attempt to locate ConfigMap with provided Pod Template name.
+ try {
+ V1ConfigMap configMap = getConfigMap(configMapName);
+ if (configMap == null) {
+ throw new ApiException(
+ String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
+ }
+
+ final Map<String, String> configMapData = configMap.getData();
+ if (configMapData != null && configMapData.containsKey(podTemplateName)) {
+ // NullPointerException when Pod Template is empty.
+ V1PodTemplateSpec podTemplate = ((V1PodTemplate)
+ Yaml.load(configMapData.get(podTemplateName))).getTemplate();
+ LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
+ configMapName, podTemplateName));
+ return podTemplate;
+ }
+
+ // Failure to locate Pod Template with provided name.
+ throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
+ podTemplateName, configMapName));
+ } catch (ApiException e) {
+ KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
+ throw new TopologySubmissionException(e.getMessage());
+ } catch (IOException | ClassCastException | NullPointerException e) {
+ final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
+ podTemplateName, configMapName);
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ /**
+ * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
+ * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+ * or <code>Manager</code>.
+ * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
+ */
+ @VisibleForTesting
+ protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
+ final String podTemplateConfigMapName = KubernetesContext
+ .getPodTemplateConfigMapName(getConfiguration(), isExecutor);
+
+ if (podTemplateConfigMapName == null) {
+ return null;
+ }
+
+ try {
+ final int splitPoint = podTemplateConfigMapName.indexOf(".");
+ final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
+ final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
+
+ if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
+ throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
+ }
+
+ return new Pair<>(configMapName, podTemplateName);
+ } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+ final String message = "Invalid ConfigMap and/or Pod Template name";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ /**
+ * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
+ * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
+ * @return The retrieved <code>ConfigMap</code>.
+ */
+ @VisibleForTesting
+ protected V1ConfigMap getConfigMap(String configMapName) {
+ try {
+ return coreClient.readNamespacedConfigMap(
+ configMapName,
+ getNamespace(),
+ null);
+ } catch (ApiException e) {
+ final String message = "Error retrieving ConfigMaps";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
+ }
+ }
+
+ /**
+ * Removes all Persistent Volume Claims associated with a specific topology, if they exist.
+ * It looks for the following:
+ * metadata:
+ * labels:
+ * topology: <code>topology-name</code>
+ * onDemand: <code>true</code>
+ */
+ private void removePersistentVolumeClaims() {
+ final String name = getTopologyName();
+ final StringBuilder selectorLabel = new StringBuilder();
+
+ // Generate selector label.
+ for (Map.Entry<String, String> label : getPersistentVolumeClaimLabels(name).entrySet()) {
+ if (selectorLabel.length() != 0) {
+ selectorLabel.append(",");
+ }
+ selectorLabel.append(label.getKey()).append("=").append(label.getValue());
+ }
+
+ // Remove all dynamically backed Persistent Volume Claims.
+ try {
+ V1Status status = coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
+ getNamespace(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ selectorLabel.toString(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null);
+
+ LOG.log(Level.INFO,
+ String.format("Removing automatically generated Persistent Volume Claims for `%s`:%n%s",
+ name, status.getMessage()));
+ } catch (ApiException e) {
+ final String message = String.format("Failed to connect to K8s cluster to delete Persistent "
+ + "Volume Claims for topology `%s`. A manual clean-up is required.%n%s",
+ name, e.getMessage());
+ LOG.log(Level.WARNING, message);
+ throw new TopologyRuntimeManagementException(message);
+ }
+ }
+
+ /**
+ * Generates the <code>Label</code> which are attached to a Topology's Persistent Volume Claims.
+ * @param topologyName Attached to the topology match label.
+ * @return A map consisting of the <code>label-value</code> pairs to be used in <code>Label</code>s.
+ */
+ @VisibleForTesting
+ protected static Map<String, String> getPersistentVolumeClaimLabels(String topologyName) {
+ return new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ put(KubernetesConstants.LABEL_ON_DEMAND, "true");
+ }
+ };
+ }
+
+ /**
+ * Generates the <code>StatefulSet</code> name depending on if it is a <code>Executor</code> or
+ * <code>Manager</code>.
+ * @param isExecutor Flag used to generate name for <code>Executor</code> or <code>Manager</code>.
+ * @return String <code>"topology-name"-executors</code>.
+ */
+ private String getStatefulSetName(boolean isExecutor) {
+ return String.format("%s-%s", getTopologyName(),
+ isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
+ }
+
+ /**
+ * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
+ * @return A label of the form <code>app=heron,topology=topology-name</code>.
+ */
+ private String createTopologySelectorLabels() {
+ return String.format("%s=%s,%s=%s",
+ KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
+ KubernetesConstants.LABEL_TOPOLOGY, getTopologyName());
+ }
+}
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index 709662a9f30..83891727110 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -95,8 +95,8 @@ final class KubernetesUtils {
return Math.round(value * scale) / scale;
}
- static class V1ControllerUtils<T> {
- private static final Logger LOG = Logger.getLogger(V1Controller.class.getName());
+ static class CommonUtils<T> {
+ private static final Logger LOG = Logger.getLogger(KubernetesShim.class.getName());
/**
* Merge two lists by keeping all values in the <code>primaryList</code> and de-duplicating values in
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
similarity index 60%
rename from heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
rename to heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 1cd85e5ad6f..b1f2fc990e2 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/V1Controller.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -19,7 +19,6 @@
package org.apache.heron.scheduler.kubernetes;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -28,8 +27,6 @@ import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
@@ -38,24 +35,15 @@ import java.util.stream.IntStream;
import com.google.common.annotations.VisibleForTesting;
import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.Pair;
import org.apache.heron.scheduler.TopologyRuntimeManagementException;
import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.scheduler.utils.Runtime;
import org.apache.heron.scheduler.utils.SchedulerUtils;
import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import org.apache.heron.spi.common.Config;
-import org.apache.heron.spi.packing.PackingPlan;
import org.apache.heron.spi.packing.Resource;
import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.custom.V1Patch;
-import io.kubernetes.client.openapi.ApiClient;
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.Configuration;
-import io.kubernetes.client.openapi.apis.AppsV1Api;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerPort;
import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -65,314 +53,135 @@ import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
import io.kubernetes.client.openapi.models.V1ObjectMeta;
import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodTemplate;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1SecretKeySelector;
import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
-import io.kubernetes.client.openapi.models.V1Service;
-import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
-import io.kubernetes.client.openapi.models.V1Status;
import io.kubernetes.client.openapi.models.V1Toleration;
import io.kubernetes.client.openapi.models.V1Volume;
import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.util.PatchUtils;
-import io.kubernetes.client.util.Yaml;
-import okhttp3.Response;
-import static java.net.HttpURLConnection.HTTP_NOT_FOUND;
+final class StatefulSet {
+ private final Map<Type, IStatefulSetFactory> statefulsets = new HashMap<>();
-public class V1Controller extends KubernetesController {
-
- private static final Logger LOG =
- Logger.getLogger(V1Controller.class.getName());
+ public enum Type {
+ Executor,
+ Manager
+ }
- private static final String ENV_SHARD_ID = "SHARD_ID";
+ private Configs clusterConfigs;
- private final boolean isPodTemplateDisabled;
+ private static final Logger LOG = Logger.getLogger(StatefulSet.class.getName());
- private final AppsV1Api appsClient;
- private final CoreV1Api coreClient;
+ private static final String ENV_SHARD_ID = "SHARD_ID";
/**
- * Configures the Kubernetes API Application and Core communications clients.
- * @param configuration <code>topology</code> configurations.
- * @param runtimeConfiguration Kubernetes runtime configurations.
+ * Container class of all the Kubernetes cluster configurations. The methods contained within
+ * <code>KubernetesController</code> cannot be accessed externally since it is an abstract class.
*/
- V1Controller(Config configuration, Config runtimeConfiguration) {
- super(configuration, runtimeConfiguration);
-
- isPodTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
- LOG.log(Level.WARNING, String.format("Pod Template configuration is %s",
- isPodTemplateDisabled ? "DISABLED" : "ENABLED"));
- LOG.log(Level.WARNING, String.format("Volume configuration from CLI is %s",
- KubernetesContext.getVolumesFromCLIDisabled(configuration) ? "DISABLED" : "ENABLED"));
-
- try {
- final ApiClient apiClient = io.kubernetes.client.util.Config.defaultClient();
- Configuration.setDefaultApiClient(apiClient);
- appsClient = new AppsV1Api(apiClient);
- coreClient = new CoreV1Api(apiClient);
- } catch (IOException e) {
- LOG.log(Level.SEVERE, "Failed to setup Kubernetes client" + e);
- throw new RuntimeException(e);
- }
- }
+ static final class Configs {
+ private final String topologyName;
+ private final Config configuration;
+ private final Config runtimeConfiguration;
+ private final V1PodTemplateSpec managerPodTemplateSpec;
+ private final V1PodTemplateSpec executorPodTemplateSpec;
- /**
- * Configures all components required by a <code>topology</code> and submits it to the Kubernetes scheduler.
- * @param packingPlan Used to configure the StatefulSets <code>Resource</code>s and replica count.
- * @return Success indicator.
- */
- @Override
- boolean submit(PackingPlan packingPlan) {
- final String topologyName = getTopologyName();
- if (!topologyName.equals(topologyName.toLowerCase())) {
- throw new TopologySubmissionException("K8S scheduler does not allow upper case topology's.");
+ /**
+ * <code>Configs</code> contains the Kubernetes cluster configurations as well as the
+ * <code>Pod Templates</code> for the <code>Executor</code>s and <code>Manager</code>.
+ * @param configuration The cluster configurations contains items relating/stored in the Kubernetes cluster.
+ * @param runtimeConfiguration The runtime configurations contain items such as the topology name.
+ * @param managerPodTemplateSpec The <code>Pod Template Spec</code> configurations for a <code>Manager</code>.
+ * @param executorPodTemplateSpec The <code>Pod Template Spec</code> configurations for the <code>Executor</code>s.
+ */
+ Configs(Config configuration, Config runtimeConfiguration,
+ V1PodTemplateSpec managerPodTemplateSpec, V1PodTemplateSpec executorPodTemplateSpec) {
+ this.topologyName = Runtime.topologyName(runtimeConfiguration);
+ this.configuration = configuration;
+ this.runtimeConfiguration = runtimeConfiguration;
+ this.managerPodTemplateSpec = managerPodTemplateSpec;
+ this.executorPodTemplateSpec = executorPodTemplateSpec;
}
- final Resource containerResource = getContainerResource(packingPlan);
-
- final V1Service topologyService = createTopologyService();
- try {
- coreClient.createNamespacedService(getNamespace(), topologyService, null,
- null, null);
- } catch (ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, "Error creating topology service", e);
- throw new TopologySubmissionException(e.getMessage());
+ Config getConfiguration() {
+ return configuration;
}
- // Find the max number of instances in a container so that we can open
- // enough ports if remote debugging is enabled.
- int numberOfInstances = 0;
- for (PackingPlan.ContainerPlan containerPlan : packingPlan.getContainers()) {
- numberOfInstances = Math.max(numberOfInstances, containerPlan.getInstances().size());
+ Config getRuntimeConfiguration() {
+ return runtimeConfiguration;
}
- final V1StatefulSet executors = createStatefulSet(containerResource, numberOfInstances, true);
- final V1StatefulSet manager = createStatefulSet(containerResource, numberOfInstances, false);
-
- try {
- appsClient.createNamespacedStatefulSet(getNamespace(), executors, null,
- null, null);
- appsClient.createNamespacedStatefulSet(getNamespace(), manager, null,
- null, null);
- } catch (ApiException e) {
- final String message = String.format("Error creating topology: %s%n", e.getResponseBody());
- KubernetesUtils.logExceptionWithDetails(LOG, message, e);
- throw new TopologySubmissionException(message);
- }
-
- return true;
- }
-
- /**
- * Shuts down a <code>topology</code> by deleting all associated resources.
- * <ul>
- * <li><code>Persistent Volume Claims</code> added by the <code>topology</code>.</li>
- * <li><code>StatefulSet</code> for both the <code>Executors</code> and <code>Manager</code>.</li>
- * <li>Headless <code>Service</code> which facilitates communication between all Pods.</li>
- * </ul>
- * @return Success indicator.
- */
- @Override
- boolean killTopology() {
- removePersistentVolumeClaims();
- deleteStatefulSets();
- deleteService();
- return true;
- }
- /**
- * Restarts a topology by deleting the Pods associated with it using <code>Selector Labels</code>.
- * @param shardId Not used but required because of interface.
- * @return Indicator of successful submission of restart request to Kubernetes cluster.
- */
- @Override
- boolean restart(int shardId) {
- try {
- coreClient.deleteCollectionNamespacedPod(getNamespace(), null, null, null, null, 0,
- createTopologySelectorLabels(), null, null, null, null, null, null, null);
- LOG.log(Level.WARNING, String.format("Restarting topology '%s'...", getTopologyName()));
- } catch (ApiException e) {
- LOG.log(Level.SEVERE, String.format("Failed to restart topology '%s'...", getTopologyName()));
- return false;
+ String getTopologyName() {
+ return topologyName;
}
- return true;
- }
- /**
- * Adds a specified number of Pods to a <code>topology</code>'s <code>Executors</code>.
- * @param containersToAdd Set of containers to be added.
- * @return The passed in <code>Packing Plan</code>.
- */
- @Override
- public Set<PackingPlan.ContainerPlan>
- addContainers(Set<PackingPlan.ContainerPlan> containersToAdd) {
- final V1StatefulSet statefulSet;
- try {
- statefulSet = getStatefulSet();
- } catch (ApiException ae) {
- final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
- throw new TopologyRuntimeManagementException(message, ae);
- }
- final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
- final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
- final int newContainerCount = currentContainerCount + containersToAdd.size();
-
- try {
- patchStatefulSetReplicas(newContainerCount);
- } catch (ApiException ae) {
- throw new TopologyRuntimeManagementException(
- ae.getMessage() + "\ndetails\n" + ae.getResponseBody());
+ V1PodTemplateSpec getManagerPodTemplateSpec() {
+ return managerPodTemplateSpec;
}
- return containersToAdd;
+ V1PodTemplateSpec getExecutorPodTemplateSpec() {
+ return executorPodTemplateSpec;
+ }
}
- /**
- * Removes a specified number of Pods from a <code>topology</code>'s <code>Executors</code>.
- * @param containersToRemove Set of containers to be removed.
- */
- @Override
- public void removeContainers(Set<PackingPlan.ContainerPlan> containersToRemove) {
- final V1StatefulSet statefulSet;
- try {
- statefulSet = getStatefulSet();
- } catch (ApiException ae) {
- final String message = ae.getMessage() + "\ndetails:" + ae.getResponseBody();
- throw new TopologyRuntimeManagementException(message, ae);
- }
+ @VisibleForTesting
+ protected void setClusterConfigs(Configs configs) {
+ this.clusterConfigs = configs;
+ }
- final V1StatefulSetSpec v1StatefulSet = Objects.requireNonNull(statefulSet.getSpec());
- final int currentContainerCount = Objects.requireNonNull(v1StatefulSet.getReplicas());
- final int newContainerCount = currentContainerCount - containersToRemove.size();
+ @VisibleForTesting
+ protected StatefulSet() {
+ statefulsets.put(Type.Executor, new ExecutorFactory());
+ statefulsets.put(Type.Manager, new ManagerFactory());
+ }
- try {
- patchStatefulSetReplicas(newContainerCount);
- } catch (ApiException e) {
- throw new TopologyRuntimeManagementException(
- e.getMessage() + "\ndetails\n" + e.getResponseBody());
- }
+ static StatefulSet get() {
+ return new StatefulSet();
}
- /**
- * Performs an in-place update of the replica count for a <code>topology</code>. This allows the
- * <code>topology</code> Pod count to be scaled up or down.
- * @param replicas The new number of Pod replicas required.
- * @throws ApiException in the event there is a failure patching the StatefulSet.
- */
- private void patchStatefulSetReplicas(int replicas) throws ApiException {
- final String body =
- String.format(KubernetesConstants.JSON_PATCH_STATEFUL_SET_REPLICAS_FORMAT,
- replicas);
- final V1Patch patch = new V1Patch(body);
-
- PatchUtils.patch(V1StatefulSet.class,
- () ->
- appsClient.patchNamespacedStatefulSetCall(
- getStatefulSetName(true),
- getNamespace(),
- patch,
- null,
- null,
- null,
- null,
- null),
- V1Patch.PATCH_FORMAT_JSON_PATCH,
- appsClient.getApiClient());
+ interface IStatefulSetFactory {
+ V1StatefulSet create(Configs configs, Resource containerResources, int numberOfInstances);
}
/**
- * Retrieves the <code>Executors</code> StatefulSet configurations for the Kubernetes cluster.
- * @return <code>Executors</code> StatefulSet configurations.
- * @throws ApiException in the event there is a failure retrieving the StatefulSet.
+ * Creates configured <code>Executor</code> or <code>Manager</code> <code>Stateful Set</code>.
+ * @param type One of <code>Executor</code> or <code>Manager</code>
+ * @param configs Cluster configuration information container.
+ * @param containerResources The container system resource configurations.
+ * @param numberOfInstances The container count.
+ * @return Fully configured <code>Stateful Set</code> or <code>null</code> on invalid <code>type</code>.
*/
- V1StatefulSet getStatefulSet() throws ApiException {
- return appsClient.readNamespacedStatefulSet(getStatefulSetName(true), getNamespace(),
- null);
+ V1StatefulSet create(Type type, Configs configs, Resource containerResources,
+ int numberOfInstances) {
+ if (statefulsets.containsKey(type)) {
+ return statefulsets.get(type).create(configs, containerResources, numberOfInstances);
+ }
+ return null;
}
- /**
- * Deletes the headless <code>Service</code> for a <code>topology</code>'s <code>Executors</code>
- * and <code>Manager</code> using the <code>topology</code>'s name.
- */
- void deleteService() {
- try (Response response = coreClient.deleteNamespacedServiceCall(getTopologyName(),
- getNamespace(), null, null, 0, null,
- KubernetesConstants.DELETE_OPTIONS_PROPAGATION_POLICY, null, null).execute()) {
-
- if (!response.isSuccessful()) {
- if (response.code() == HTTP_NOT_FOUND) {
- LOG.log(Level.WARNING, "Deleting non-existent Kubernetes headless service for Topology: "
- + getTopologyName());
- return;
- }
- LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
- LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
- KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+ class ExecutorFactory implements IStatefulSetFactory {
- throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
- }
- } catch (ApiException e) {
- if (e.getCode() == HTTP_NOT_FOUND) {
- LOG.log(Level.WARNING, "Tried to delete a non-existent Kubernetes service for Topology: "
- + getTopologyName());
- return;
- }
- throw new TopologyRuntimeManagementException("Error deleting topology ["
- + getTopologyName() + "] Kubernetes service", e);
- } catch (IOException e) {
- throw new TopologyRuntimeManagementException("Error deleting topology ["
- + getTopologyName() + "] Kubernetes service", e);
+ @Override
+ public V1StatefulSet create(Configs configs, Resource containerResources,
+ int numberOfInstances) {
+ setClusterConfigs(configs);
+ return createStatefulSet(containerResources, numberOfInstances, true);
}
- LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
}
- /**
- * Deletes the StatefulSets for a <code>topology</code>'s <code>Executors</code> and <code>Manager</code>
- * using <code>Label</code>s.
- */
- void deleteStatefulSets() {
- try (Response response = appsClient.deleteCollectionNamespacedStatefulSetCall(getNamespace(),
- null, null, null, null, null, createTopologySelectorLabels(), null, null, null, null, null,
- null, null, null)
- .execute()) {
-
- if (!response.isSuccessful()) {
- if (response.code() == HTTP_NOT_FOUND) {
- LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSets for Topology: "
- + getTopologyName());
- return;
- }
- LOG.log(Level.SEVERE, "Error when deleting the StatefulSets of the job ["
- + getTopologyName() + "] in namespace [" + getNamespace() + "]");
- LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
- KubernetesUtils.logResponseBodyIfPresent(LOG, response);
+ class ManagerFactory implements IStatefulSetFactory {
- throw new TopologyRuntimeManagementException(
- KubernetesUtils.errorMessageFromResponse(response));
- }
- } catch (ApiException e) {
- if (e.getCode() == HTTP_NOT_FOUND) {
- LOG.log(Level.WARNING, "Tried to delete a non-existent StatefulSet for Topology: "
- + getTopologyName());
- return;
- }
- throw new TopologyRuntimeManagementException("Error deleting topology ["
- + getTopologyName() + "] Kubernetes StatefulSets", e);
- } catch (IOException e) {
- throw new TopologyRuntimeManagementException("Error deleting topology ["
- + getTopologyName() + "] Kubernetes StatefulSets", e);
+ @Override
+ public V1StatefulSet create(Configs configs, Resource containerResources,
+ int numberOfInstances) {
+ setClusterConfigs(configs);
+ return createStatefulSet(containerResources, numberOfInstances, false);
}
- LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
- + "] in namespace [" + getNamespace() + "] is deleted.");
}
+
/**
* Generates the command to start Heron within the <code>container</code>.
* @param containerId Passed down to <>SchedulerUtils</> to generate executor command.
@@ -382,8 +191,8 @@ public class V1Controller extends KubernetesController {
*/
protected List<String> getExecutorCommand(String containerId, int numOfInstances,
boolean isExecutor) {
- final Config configuration = getConfiguration();
- final Config runtimeConfiguration = getRuntimeConfiguration();
+ final Config configuration = clusterConfigs.getConfiguration();
+ final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
final Map<ExecutorPort, String> ports =
KubernetesConstants.EXECUTOR_PORTS.entrySet()
.stream()
@@ -427,32 +236,6 @@ public class V1Controller extends KubernetesController {
return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
}
- /**
- * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
- * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
- */
- private V1Service createTopologyService() {
- final String topologyName = getTopologyName();
-
- final V1Service service = new V1Service();
-
- // Setup service metadata.
- final V1ObjectMeta objectMeta = new V1ObjectMeta()
- .name(topologyName)
- .annotations(getServiceAnnotations())
- .labels(getServiceLabels());
- service.setMetadata(objectMeta);
-
- // Create the headless service.
- final V1ServiceSpec serviceSpec = new V1ServiceSpec()
- .clusterIP("None")
- .selector(getPodMatchLabels(topologyName));
-
- service.setSpec(serviceSpec);
-
- return service;
- }
-
/**
* Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in.
* @param containerResource Passed down to configure the <code>executor</code> resource limits.
@@ -462,24 +245,27 @@ public class V1Controller extends KubernetesController {
*/
private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances,
boolean isExecutor) {
- final String topologyName = getTopologyName();
- final Config runtimeConfiguration = getRuntimeConfiguration();
+ final String topologyName = clusterConfigs.getTopologyName();
+ final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
final List<V1Volume> volumes = new LinkedList<>();
final List<V1VolumeMount> volumeMounts = new LinkedList<>();
// Collect Persistent Volume Claim configurations from the CLI.
final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configsPVC =
- KubernetesContext.getVolumeClaimTemplates(getConfiguration(), isExecutor);
+ KubernetesContext.getVolumeClaimTemplates(clusterConfigs.getConfiguration(), isExecutor);
// Collect all Volume configurations from the CLI and generate Volumes and Volume Mounts.
createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, volumeMounts);
createVolumeAndMountsHostPathCLI(
- KubernetesContext.getVolumeHostPath(getConfiguration(), isExecutor), volumes, volumeMounts);
+ KubernetesContext.getVolumeHostPath(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
createVolumeAndMountsEmptyDirCLI(
- KubernetesContext.getVolumeEmptyDir(getConfiguration(), isExecutor), volumes, volumeMounts);
+ KubernetesContext.getVolumeEmptyDir(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
createVolumeAndMountsNFSCLI(
- KubernetesContext.getVolumeNFS(getConfiguration(), isExecutor), volumes, volumeMounts);
+ KubernetesContext.getVolumeNFS(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
final V1StatefulSet statefulSet = new V1StatefulSet();
@@ -509,7 +295,9 @@ public class V1Controller extends KubernetesController {
statefulSetSpec.setSelector(selector);
// Create a Pod Template.
- final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
+ final V1PodTemplateSpec podTemplateSpec =
+ isExecutor ? clusterConfigs.getExecutorPodTemplateSpec()
+ : clusterConfigs.getManagerPodTemplateSpec();
// Set up Pod Metadata.
final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName));
@@ -536,15 +324,7 @@ public class V1Controller extends KubernetesController {
* @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod.
*/
private Map<String, String> getPodAnnotations() {
- return KubernetesContext.getPodAnnotations(getConfiguration());
- }
-
- /**
- * Extracts <code>Service Annotations</code> for configurations.
- * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
- */
- private Map<String, String> getServiceAnnotations() {
- return KubernetesContext.getServiceAnnotations(getConfiguration());
+ return KubernetesContext.getPodAnnotations(clusterConfigs.getConfiguration());
}
/**
@@ -582,18 +362,10 @@ public class V1Controller extends KubernetesController {
final Map<String, String> labels = new HashMap<>();
labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
- labels.putAll(KubernetesContext.getPodLabels(getConfiguration()));
+ labels.putAll(KubernetesContext.getPodLabels(clusterConfigs.getConfiguration()));
return labels;
}
- /**
- * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
- * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
- */
- private Map<String, String> getServiceLabels() {
- return KubernetesContext.getServiceLabels(getConfiguration());
- }
-
/**
* Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container
* will be configured to allow it to function but other supplied containers are loaded verbatim.
@@ -662,8 +434,7 @@ public class V1Controller extends KubernetesController {
*/
@VisibleForTesting
protected void configureTolerations(final V1PodSpec spec) {
- KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1Toleration> utils = new KubernetesUtils.CommonUtils<>();
spec.setTolerations(
utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations")
@@ -695,7 +466,7 @@ public class V1Controller extends KubernetesController {
* @param podSpec <code>Pod Spec</code> to add secrets to.
*/
private void mountSecretsAsVolumes(V1PodSpec podSpec) {
- final Config config = getConfiguration();
+ final Config config = clusterConfigs.getConfiguration();
final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
for (Map.Entry<String, String> secret : secrets.entrySet()) {
final V1VolumeMount mount = new V1VolumeMount()
@@ -722,7 +493,7 @@ public class V1Controller extends KubernetesController {
*/
private void configureHeronContainer(Resource resource, int numberOfInstances,
final V1Container container, boolean isExecutor) {
- final Config configuration = getConfiguration();
+ final Config configuration = clusterConfigs.getConfiguration();
// Set up the container images.
container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
@@ -748,7 +519,7 @@ public class V1Controller extends KubernetesController {
// Set container ports.
final boolean debuggingEnabled =
TopologyUtils.getTopologyRemoteDebuggingEnabled(
- Runtime.topology(getRuntimeConfiguration()));
+ Runtime.topology(clusterConfigs.getRuntimeConfiguration()));
configureContainerPorts(debuggingEnabled, numberOfInstances, container);
// setup volume mounts
@@ -852,7 +623,7 @@ public class V1Controller extends KubernetesController {
@VisibleForTesting
protected void configureContainerEnvVars(final V1Container container) {
// Deduplicate on var name with Heron defaults take precedence.
- KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1EnvVar> utils = new KubernetesUtils.CommonUtils<>();
container.setEnv(
utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables")
@@ -896,8 +667,7 @@ public class V1Controller extends KubernetesController {
}
// Set container ports. Deduplicate using port number with Heron defaults taking precedence.
- KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1ContainerPort> utils = new KubernetesUtils.CommonUtils<>();
container.setPorts(
utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports")
@@ -945,7 +715,7 @@ public class V1Controller extends KubernetesController {
*/
@VisibleForTesting
protected void mountVolumeIfPresent(final V1Container container) {
- final Config config = getConfiguration();
+ final Config config = clusterConfigs.getConfiguration();
if (KubernetesContext.hasContainerVolume(config)) {
final V1VolumeMount mount =
new V1VolumeMount()
@@ -953,8 +723,7 @@ public class V1Controller extends KubernetesController {
.mountPath(KubernetesContext.getContainerVolumeMountPath(config));
// Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence.
- KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1VolumeMount> utils = new KubernetesUtils.CommonUtils<>();
container.setVolumeMounts(
utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(),
Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts")
@@ -967,15 +736,15 @@ public class V1Controller extends KubernetesController {
* @param container <code>container</code> to be configured.
*/
private void setSecretKeyRefs(V1Container container) {
- final Config config = getConfiguration();
+ final Config config = clusterConfigs.getConfiguration();
final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
final String[] keyRefParts = secret.getValue().split(":");
if (keyRefParts.length != 2) {
- LOG.log(Level.SEVERE,
- "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
- throw new TopologyRuntimeManagementException(
- "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+ final String msg =
+ String.format("SecretKeyRef must be in the form name:key. <%s>", secret.getValue());
+ LOG.log(Level.SEVERE, msg);
+ throw new TopologyRuntimeManagementException(msg);
}
String name = keyRefParts[0];
String key = keyRefParts[1];
@@ -989,113 +758,6 @@ public class V1Controller extends KubernetesController {
}
}
- /**
- * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
- * The loaded text is then parsed into a usable <code>Pod Template</code>.
- * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
- * or <code>Manager</code>.
- * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
- */
- @VisibleForTesting
- protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
- final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
-
- // Default Pod Template.
- if (podTemplateConfigMapName == null) {
- LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
- return new V1PodTemplateSpec();
- }
-
- if (isPodTemplateDisabled) {
- throw new TopologySubmissionException("Custom Pod Templates are disabled");
- }
-
- final String configMapName = podTemplateConfigMapName.first;
- final String podTemplateName = podTemplateConfigMapName.second;
-
- // Attempt to locate ConfigMap with provided Pod Template name.
- try {
- V1ConfigMap configMap = getConfigMap(configMapName);
- if (configMap == null) {
- throw new ApiException(
- String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
- }
-
- final Map<String, String> configMapData = configMap.getData();
- if (configMapData != null && configMapData.containsKey(podTemplateName)) {
- // NullPointerException when Pod Template is empty.
- V1PodTemplateSpec podTemplate = ((V1PodTemplate)
- Yaml.load(configMapData.get(podTemplateName))).getTemplate();
- LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
- configMapName, podTemplateName));
- return podTemplate;
- }
-
- // Failure to locate Pod Template with provided name.
- throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
- podTemplateName, configMapName));
- } catch (ApiException e) {
- KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
- throw new TopologySubmissionException(e.getMessage());
- } catch (IOException | ClassCastException | NullPointerException e) {
- final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
- podTemplateName, configMapName);
- KubernetesUtils.logExceptionWithDetails(LOG, message, e);
- throw new TopologySubmissionException(message);
- }
- }
-
- /**
- * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
- * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
- * or <code>Manager</code>.
- * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
- */
- @VisibleForTesting
- protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
- final String podTemplateConfigMapName = KubernetesContext
- .getPodTemplateConfigMapName(getConfiguration(), isExecutor);
-
- if (podTemplateConfigMapName == null) {
- return null;
- }
-
- try {
- final int splitPoint = podTemplateConfigMapName.indexOf(".");
- final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
- final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
-
- if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
- throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
- }
-
- return new Pair<>(configMapName, podTemplateName);
- } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
- final String message = "Invalid ConfigMap and/or Pod Template name";
- KubernetesUtils.logExceptionWithDetails(LOG, message, e);
- throw new TopologySubmissionException(message);
- }
- }
-
- /**
- * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
- * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
- * @return The retrieved <code>ConfigMap</code>.
- */
- @VisibleForTesting
- protected V1ConfigMap getConfigMap(String configMapName) {
- try {
- return coreClient.readNamespacedConfigMap(
- configMapName,
- getNamespace(),
- null);
- } catch (ApiException e) {
- final String message = "Error retrieving ConfigMaps";
- KubernetesUtils.logExceptionWithDetails(LOG, message, e);
- throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
- }
- }
-
/**
* Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
* to <code>key-value</code> pairs of configuration options and values.
@@ -1120,7 +782,7 @@ public class V1Controller extends KubernetesController {
listOfPVCs.add(Volumes.get()
.createPersistentVolumeClaim(pvc.getKey(),
- getPersistentVolumeClaimLabels(getTopologyName()), pvc.getValue()));
+ getPersistentVolumeClaimLabels(clusterConfigs.getTopologyName()), pvc.getValue()));
}
return listOfPVCs;
}
@@ -1226,71 +888,19 @@ public class V1Controller extends KubernetesController {
// Deduplicate on Names with Persistent Volume Claims taking precedence.
- KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1Volume> utilsVolumes = new KubernetesUtils.CommonUtils<>();
podSpec.setVolumes(
utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
Comparator.comparing(V1Volume::getName),
"Pod with Volumes"));
- KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1VolumeMount> utilsMounts = new KubernetesUtils.CommonUtils<>();
executor.setVolumeMounts(
utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
Comparator.comparing(V1VolumeMount::getName),
"Heron container with Volume Mounts"));
}
- /**
- * Removes all Persistent Volume Claims associated with a specific topology, if they exist.
- * It looks for the following:
- * metadata:
- * labels:
- * topology: <code>topology-name</code>
- * onDemand: <code>true</code>
- */
- private void removePersistentVolumeClaims() {
- final String name = getTopologyName();
- final StringBuilder selectorLabel = new StringBuilder();
-
- // Generate selector label.
- for (Map.Entry<String, String> label : getPersistentVolumeClaimLabels(name).entrySet()) {
- if (selectorLabel.length() != 0) {
- selectorLabel.append(",");
- }
- selectorLabel.append(label.getKey()).append("=").append(label.getValue());
- }
-
- // Remove all dynamically backed Persistent Volume Claims.
- try {
- V1Status status = coreClient.deleteCollectionNamespacedPersistentVolumeClaim(
- getNamespace(),
- null,
- null,
- null,
- null,
- null,
- selectorLabel.toString(),
- null,
- null,
- null,
- null,
- null,
- null,
- null);
-
- LOG.log(Level.INFO,
- String.format("Removing automatically generated Persistent Volume Claims for `%s`:%n%s",
- name, status.getMessage()));
- } catch (ApiException e) {
- final String message = String.format("Failed to connect to K8s cluster to delete Persistent "
- + "Volume Claims for topology `%s`. A manual clean-up is required.%n%s",
- name, e.getMessage());
- LOG.log(Level.WARNING, message);
- throw new TopologyRuntimeManagementException(message);
- }
- }
-
/**
* Generates the <code>Label</code> which are attached to a Topology's Persistent Volume Claims.
* @param topologyName Attached to the topology match label.
@@ -1313,17 +923,7 @@ public class V1Controller extends KubernetesController {
* @return String <code>"topology-name"-executors</code>.
*/
private String getStatefulSetName(boolean isExecutor) {
- return String.format("%s-%s", getTopologyName(),
+ return String.format("%s-%s", clusterConfigs.getTopologyName(),
isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
}
-
- /**
- * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
- * @return A label of the form <code>app=heron,topology=topology-name</code>.
- */
- private String createTopologySelectorLabels() {
- return String.format("%s=%s,%s=%s",
- KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
- KubernetesConstants.LABEL_TOPOLOGY, getTopologyName());
- }
}
diff --git a/heron/schedulers/tests/java/BUILD b/heron/schedulers/tests/java/BUILD
index 3ad9ffedf36..c8eac176c74 100644
--- a/heron/schedulers/tests/java/BUILD
+++ b/heron/schedulers/tests/java/BUILD
@@ -205,7 +205,8 @@ java_tests(
"org.apache.heron.scheduler.kubernetes.KubernetesLauncherTest",
"org.apache.heron.scheduler.kubernetes.VolumesTests",
"org.apache.heron.scheduler.kubernetes.KubernetesContextTest",
- "org.apache.heron.scheduler.kubernetes.V1ControllerTest",
+ "org.apache.heron.scheduler.kubernetes.KubernetesShimTest",
+ "org.apache.heron.scheduler.kubernetes.StatefulSetTest",
"org.apache.heron.scheduler.kubernetes.KubernetesUtilsTest",
],
runtime_deps = [":kubernetes-tests"],
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
new file mode 100644
index 00000000000..ec1be4e87a9
--- /dev/null
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesShimTest.java
@@ -0,0 +1,419 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.heron.scheduler.kubernetes;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Spy;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologySubmissionException;
+import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+import org.apache.heron.spi.common.Config;
+import org.apache.heron.spi.common.Key;
+
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doReturn;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KubernetesShimTest {
+
+ private static final String TOPOLOGY_NAME = "topology-name";
+ private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
+ private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
+ private static final String CONFIGMAP_POD_TEMPLATE_NAME =
+ String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+ private static final String POD_TEMPLATE_VALID =
+ "apiVersion: apps/v1\n"
+ + "kind: PodTemplate\n"
+ + "metadata:\n"
+ + " name: heron-tracker\n"
+ + " namespace: default\n"
+ + "template:\n"
+ + " metadata:\n"
+ + " labels:\n"
+ + " app: heron-tracker\n"
+ + " spec:\n"
+ + " containers:\n"
+ + " - name: heron-tracker\n"
+ + " image: apache/heron:latest\n"
+ + " ports:\n"
+ + " - containerPort: 8888\n"
+ + " name: api-port\n"
+ + " resources:\n"
+ + " requests:\n"
+ + " cpu: \"100m\"\n"
+ + " memory: \"200M\"\n"
+ + " limits:\n"
+ + " cpu: \"400m\"\n"
+ + " memory: \"512M\"";
+ private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
+ String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+ KubernetesConstants.EXECUTOR_NAME);
+ private static final String POD_TEMPLATE_LOCATION_MANAGER =
+ String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
+ KubernetesConstants.MANAGER_NAME);
+
+ private static final Config CONFIG = Config.newBuilder().build();
+ private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+ .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+ .build();
+ private static final Config RUNTIME = Config.newBuilder()
+ .put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
+ .build();
+ private final Config configDisabledPodTemplate = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+ .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
+ .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_DISABLED, "true")
+ .build();
+
+ @Spy
+ private final KubernetesShim v1ControllerWithPodTemplate =
+ new KubernetesShim(CONFIG_WITH_POD_TEMPLATE, RUNTIME);
+
+ @Spy
+ private final KubernetesShim v1ControllerPodTemplate =
+ new KubernetesShim(configDisabledPodTemplate, RUNTIME);
+
+ @Rule
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ @Test
+ public void testLoadPodFromTemplateDefault() {
+ final KubernetesShim v1ControllerNoPodTemplate = new KubernetesShim(CONFIG, RUNTIME);
+ final V1PodTemplateSpec defaultPodSpec = new V1PodTemplateSpec();
+
+ final V1PodTemplateSpec podSpecExecutor = v1ControllerNoPodTemplate.loadPodFromTemplate(true);
+ Assert.assertEquals("Default Pod Spec for Executor", defaultPodSpec, podSpecExecutor);
+
+ final V1PodTemplateSpec podSpecManager = v1ControllerNoPodTemplate.loadPodFromTemplate(false);
+ Assert.assertEquals("Default Pod Spec for Manager", defaultPodSpec, podSpecManager);
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNullConfigMap() {
+ final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor not found", true, "unable to locate"));
+ testCases.add(new TestTuple<>("Manager not found", false, "unable to locate"));
+
+ for (TestTuple<Boolean, String> testCase : testCases) {
+ doReturn(null)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ String message = "";
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNoConfigMap() {
+ final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor no ConfigMap", true, "Failed to locate Pod Template"));
+ testCases.add(new TestTuple<>("Manager no ConfigMap", false, "Failed to locate Pod Template"));
+
+ for (TestTuple<Boolean, String> testCase : testCases) {
+ doReturn(new V1ConfigMap())
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ String message = "";
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testLoadPodFromTemplateNoTargetConfigMap() {
+ final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor no target ConfigMap",
+ true, "Failed to locate Pod Template"));
+ testCases.add(new TestTuple<>("Manager no target ConfigMap",
+ false, "Failed to locate Pod Template"));
+
+ final V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData("Dummy Key", "Dummy Value")
+ .build();
+
+ for (TestTuple<Boolean, String> testCase : testCases) {
+ doReturn(configMapNoTargetData)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ String message = "";
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testLoadPodFromTemplateBadTargetConfigMap() {
+ // ConfigMap with target ConfigMap and an invalid Pod Template.
+ final V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, "Dummy Value")
+ .build();
+
+ // ConfigMap with target ConfigMaps and an empty Pod Template.
+ final V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, "")
+ .build();
+
+ // Test case container.
+ // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+ // Output: The expected error message.
+ final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor invalid Pod Template",
+ new Pair<>(configMapInvalidPod, true), "Error parsing"));
+ testCases.add(new TestTuple<>("Manager invalid Pod Template",
+ new Pair<>(configMapInvalidPod, false), "Error parsing"));
+ testCases.add(new TestTuple<>("Executor empty Pod Template",
+ new Pair<>(configMapEmptyPod, true), "Error parsing"));
+ testCases.add(new TestTuple<>("Manager empty Pod Template",
+ new Pair<>(configMapEmptyPod, false), "Error parsing"));
+
+ // Test loop.
+ for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+ doReturn(testCase.input.first)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ String message = "";
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(testCase.description, message.contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testLoadPodFromTemplateValidConfigMap() {
+ final String expected =
+ " containers: [class V1Container {\n"
+ + " args: null\n"
+ + " command: null\n"
+ + " env: null\n"
+ + " envFrom: null\n"
+ + " image: apache/heron:latest\n"
+ + " imagePullPolicy: null\n"
+ + " lifecycle: null\n"
+ + " livenessProbe: null\n"
+ + " name: heron-tracker\n"
+ + " ports: [class V1ContainerPort {\n"
+ + " containerPort: 8888\n"
+ + " hostIP: null\n"
+ + " hostPort: null\n"
+ + " name: api-port\n"
+ + " protocol: null\n"
+ + " }]\n"
+ + " readinessProbe: null\n"
+ + " resources: class V1ResourceRequirements {\n"
+ + " limits: {cpu=Quantity{number=0.400, format=DECIMAL_SI}, "
+ + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
+ + " requests: {cpu=Quantity{number=0.100, format=DECIMAL_SI}, "
+ + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
+ + " }\n"
+ + " securityContext: null\n"
+ + " startupProbe: null\n"
+ + " stdin: null\n"
+ + " stdinOnce: null\n"
+ + " terminationMessagePath: null\n"
+ + " terminationMessagePolicy: null\n"
+ + " tty: null\n"
+ + " volumeDevices: null\n"
+ + " volumeMounts: null\n"
+ + " workingDir: null\n"
+ + " }]";
+
+
+ // ConfigMap with valid Pod Template.
+ final V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+ .build();
+
+ // Test case container.
+ // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+ // Output: The expected Pod template as a string.
+ final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor valid Pod Template",
+ new Pair<>(configMapValidPod, true), expected));
+ testCases.add(new TestTuple<>("Manager valid Pod Template",
+ new Pair<>(configMapValidPod, false), expected));
+
+ // Test loop.
+ for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+ doReturn(testCase.input.first)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ V1PodTemplateSpec podTemplateSpec = v1ControllerWithPodTemplate.loadPodFromTemplate(true);
+
+ Assert.assertTrue(podTemplateSpec.toString().contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testLoadPodFromTemplateInvalidConfigMap() {
+ // ConfigMap with an invalid Pod Template.
+ final String invalidPodTemplate =
+ "apiVersion: apps/v1\n"
+ + "kind: InvalidTemplate\n"
+ + "metadata:\n"
+ + " name: heron-tracker\n"
+ + " namespace: default\n"
+ + "template:\n"
+ + " metadata:\n"
+ + " labels:\n"
+ + " app: heron-tracker\n"
+ + " spec:\n";
+ final V1ConfigMap configMap = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
+ .build();
+
+
+ // Test case container.
+ // Input: ConfigMap to setup mock KubernetesShim, Boolean flag for executor/manager switch.
+ // Output: The expected Pod template as a string.
+ final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
+ testCases.add(new TestTuple<>("Executor invalid Pod Template",
+ new Pair<>(configMap, true), "Error parsing"));
+ testCases.add(new TestTuple<>("Manager invalid Pod Template",
+ new Pair<>(configMap, false), "Error parsing"));
+
+ // Test loop.
+ for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
+ doReturn(testCase.input.first)
+ .when(v1ControllerWithPodTemplate)
+ .getConfigMap(anyString());
+
+ String message = "";
+ try {
+ v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(testCase.expected));
+ }
+ }
+
+ @Test
+ public void testDisablePodTemplates() {
+ // ConfigMap with valid Pod Template.
+ V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
+ .withNewMetadata()
+ .withName(CONFIGMAP_NAME)
+ .endMetadata()
+ .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
+ .build();
+ final String expected = "Pod Templates are disabled";
+ String message = "";
+ doReturn(configMapValidPod)
+ .when(v1ControllerPodTemplate)
+ .getConfigMap(anyString());
+
+ try {
+ v1ControllerPodTemplate.loadPodFromTemplate(true);
+ } catch (TopologySubmissionException e) {
+ message = e.getMessage();
+ }
+ Assert.assertTrue(message.contains(expected));
+ }
+
+ @Test
+ public void testGetPodTemplateLocationPassing() {
+ final Config testConfig = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
+ .build();
+ final KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+ final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME, POD_TEMPLATE_NAME);
+
+ // Correct parsing
+ final Pair<String, String> actual = kubernetesShim.getPodTemplateLocation(true);
+ Assert.assertEquals(expected, actual);
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoConfigMap() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, ".POD-TEMPLATE-NAME").build();
+ KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+ kubernetesShim.getPodTemplateLocation(true);
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoPodTemplate() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAME.").build();
+ KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+ kubernetesShim.getPodTemplateLocation(true);
+ }
+
+ @Test
+ public void testGetPodTemplateLocationNoDelimiter() {
+ expectedException.expect(TopologySubmissionException.class);
+ final Config testConfig = Config.newBuilder()
+ .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
+ KubernetesShim kubernetesShim = new KubernetesShim(testConfig, RUNTIME);
+ kubernetesShim.getPodTemplateLocation(true);
+ }
+}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
index ddc1f692dc4..4a80571c6d0 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -43,14 +43,14 @@ public class KubernetesUtilsTest {
public void testMergeListsDedupe() {
final String description = "Pod Template Environment Variables";
final List<V1EnvVar> heronEnvVars =
- Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+ Collections.unmodifiableList(StatefulSet.getExecutorEnvVars());
final V1EnvVar additionEnvVar = new V1EnvVar()
.name("env-variable-to-be-kept")
.valueFrom(new V1EnvVarSource()
.fieldRef(new V1ObjectFieldSelector()
.fieldPath("env-variable-was-kept")));
final List<V1EnvVar> expectedEnvVars = Collections.unmodifiableList(
- new LinkedList<V1EnvVar>(V1Controller.getExecutorEnvVars()) {{
+ new LinkedList<V1EnvVar>(StatefulSet.getExecutorEnvVars()) {{
add(additionEnvVar);
}});
final List<V1EnvVar> inputEnvVars = Arrays.asList(
@@ -67,48 +67,48 @@ public class KubernetesUtilsTest {
additionEnvVar
);
- KubernetesUtils.V1ControllerUtils<V1EnvVar> v1ControllerUtils =
- new KubernetesUtils.V1ControllerUtils<>();
+ KubernetesUtils.CommonUtils<V1EnvVar> commonUtils =
+ new KubernetesUtils.CommonUtils<>();
// Both input lists are null.
Assert.assertNull("Both input lists are <null>",
- v1ControllerUtils.mergeListsDedupe(null, null,
+ commonUtils.mergeListsDedupe(null, null,
Comparator.comparing(V1EnvVar::getName), description));
// <primaryList> is <null>.
Assert.assertEquals("<primaryList> is null and <secondaryList> should be returned",
inputEnvVars,
- v1ControllerUtils.mergeListsDedupe(null, inputEnvVars,
+ commonUtils.mergeListsDedupe(null, inputEnvVars,
Comparator.comparing(V1EnvVar::getName), description));
// <primaryList> is empty.
Assert.assertEquals("<primaryList> is empty and <secondaryList> should be returned",
inputEnvVars,
- v1ControllerUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
+ commonUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
Comparator.comparing(V1EnvVar::getName), description));
// <secondaryList> is <null>.
Assert.assertEquals("<secondaryList> is null and <primaryList> should be returned",
heronEnvVars,
- v1ControllerUtils.mergeListsDedupe(heronEnvVars, null,
+ commonUtils.mergeListsDedupe(heronEnvVars, null,
Comparator.comparing(V1EnvVar::getName), description));
// <secondaryList> is empty.
Assert.assertEquals("<secondaryList> is empty and <primaryList> should be returned",
heronEnvVars,
- v1ControllerUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
+ commonUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
Comparator.comparing(V1EnvVar::getName), description));
// Merge both lists.
Assert.assertTrue("<primaryList> and <secondaryList> merged and deduplicated",
expectedEnvVars.containsAll(
- v1ControllerUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
+ commonUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
Comparator.comparing(V1EnvVar::getName), description)));
// Expect thrown error.
String errorMessage = "";
try {
- v1ControllerUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
+ commonUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
Comparator.comparing(V1EnvVar::getName), description);
} catch (TopologySubmissionException e) {
errorMessage = e.getMessage();
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
similarity index 68%
rename from heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
rename to heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
index a039ca980e5..d58c42102c1 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/V1ControllerTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/StatefulSetTest.java
@@ -29,28 +29,22 @@ import java.util.Map;
import com.google.common.collect.ImmutableMap;
import org.junit.Assert;
-import org.junit.Rule;
import org.junit.Test;
-import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
-import org.mockito.Spy;
import org.mockito.runners.MockitoJUnitRunner;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.heron.common.basics.ByteAmount;
import org.apache.heron.common.basics.Pair;
-import org.apache.heron.scheduler.TopologySubmissionException;
-import org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.common.Key;
import org.apache.heron.spi.packing.Resource;
import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1ConfigMapBuilder;
import io.kubernetes.client.openapi.models.V1Container;
import io.kubernetes.client.openapi.models.V1ContainerBuilder;
import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1ContainerPortBuilder;
import io.kubernetes.client.openapi.models.V1EnvVar;
import io.kubernetes.client.openapi.models.V1EnvVarSource;
import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
@@ -59,6 +53,7 @@ import io.kubernetes.client.openapi.models.V1PersistentVolumeClaimBuilder;
import io.kubernetes.client.openapi.models.V1PodSpec;
import io.kubernetes.client.openapi.models.V1PodSpecBuilder;
import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpecBuilder;
import io.kubernetes.client.openapi.models.V1ResourceRequirements;
import io.kubernetes.client.openapi.models.V1Toleration;
import io.kubernetes.client.openapi.models.V1Volume;
@@ -67,49 +62,22 @@ import io.kubernetes.client.openapi.models.V1VolumeMount;
import io.kubernetes.client.openapi.models.V1VolumeMountBuilder;
import static org.apache.heron.scheduler.kubernetes.KubernetesConstants.VolumeConfigKeys;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.doReturn;
+import static org.apache.heron.scheduler.kubernetes.KubernetesUtils.TestTuple;
+import static org.apache.heron.scheduler.kubernetes.StatefulSet.getTolerations;
@RunWith(MockitoJUnitRunner.class)
-public class V1ControllerTest {
-
+public class StatefulSetTest {
private static final String TOPOLOGY_NAME = "topology-name";
private static final String CONFIGMAP_NAME = "CONFIG-MAP-NAME";
private static final String POD_TEMPLATE_NAME = "POD-TEMPLATE-NAME";
private static final String CONFIGMAP_POD_TEMPLATE_NAME =
String.format("%s.%s", CONFIGMAP_NAME, POD_TEMPLATE_NAME);
- private static final String POD_TEMPLATE_VALID =
- "apiVersion: apps/v1\n"
- + "kind: PodTemplate\n"
- + "metadata:\n"
- + " name: heron-tracker\n"
- + " namespace: default\n"
- + "template:\n"
- + " metadata:\n"
- + " labels:\n"
- + " app: heron-tracker\n"
- + " spec:\n"
- + " containers:\n"
- + " - name: heron-tracker\n"
- + " image: apache/heron:latest\n"
- + " ports:\n"
- + " - containerPort: 8888\n"
- + " name: api-port\n"
- + " resources:\n"
- + " requests:\n"
- + " cpu: \"100m\"\n"
- + " memory: \"200M\"\n"
- + " limits:\n"
- + " cpu: \"400m\"\n"
- + " memory: \"512M\"";
private static final String POD_TEMPLATE_LOCATION_EXECUTOR =
String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
KubernetesConstants.EXECUTOR_NAME);
private static final String POD_TEMPLATE_LOCATION_MANAGER =
String.format(KubernetesContext.KUBERNETES_POD_TEMPLATE_LOCATION,
KubernetesConstants.MANAGER_NAME);
-
- private static final Config CONFIG = Config.newBuilder().build();
private static final Config CONFIG_WITH_POD_TEMPLATE = Config.newBuilder()
.put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
.put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
@@ -117,332 +85,30 @@ public class V1ControllerTest {
private static final Config RUNTIME = Config.newBuilder()
.put(Key.TOPOLOGY_NAME, TOPOLOGY_NAME)
.build();
- private final Config configDisabledPodTemplate = Config.newBuilder()
- .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
- .put(POD_TEMPLATE_LOCATION_MANAGER, CONFIGMAP_POD_TEMPLATE_NAME)
- .put(KubernetesContext.KUBERNETES_POD_TEMPLATE_DISABLED, "true")
+ private static final StatefulSet STATEFUL_SET = new StatefulSet();
+ private static final V1PodTemplateSpec POD_TEMPLATE_SPEC = new V1PodTemplateSpecBuilder()
+ .withNewMetadata()
+ .withLabels(Collections.singletonMap("app", "heron-tracker"))
+ .endMetadata()
+ .withNewSpec()
+ .withContainers(new V1ContainerBuilder()
+ .withName("heron-tracker")
+ .withImage("apache/heron:latest")
+ .withPorts(new V1ContainerPortBuilder()
+ .withName("api-port")
+ .withContainerPort(8888)
+ .build())
+ .withNewResources()
+ .withRequests(Collections.singletonMap("100m", new Quantity("200M")))
+ .withLimits(Collections.singletonMap("400m", new Quantity("512M")))
+ .endResources()
+ .build())
+ .endSpec()
.build();
+ private static final StatefulSet.Configs CLUSTER_CONFIGS =
+ new StatefulSet.Configs(CONFIG_WITH_POD_TEMPLATE, RUNTIME,
+ POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
- @Spy
- private final V1Controller v1ControllerWithPodTemplate =
- new V1Controller(CONFIG_WITH_POD_TEMPLATE, RUNTIME);
-
- @Spy
- private final V1Controller v1ControllerPodTemplate =
- new V1Controller(configDisabledPodTemplate, RUNTIME);
-
- @Rule
- public final ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testLoadPodFromTemplateDefault() {
- final V1Controller v1ControllerNoPodTemplate = new V1Controller(CONFIG, RUNTIME);
- final V1PodTemplateSpec defaultPodSpec = new V1PodTemplateSpec();
-
- final V1PodTemplateSpec podSpecExecutor = v1ControllerNoPodTemplate.loadPodFromTemplate(true);
- Assert.assertEquals("Default Pod Spec for Executor", defaultPodSpec, podSpecExecutor);
-
- final V1PodTemplateSpec podSpecManager = v1ControllerNoPodTemplate.loadPodFromTemplate(false);
- Assert.assertEquals("Default Pod Spec for Manager", defaultPodSpec, podSpecManager);
- }
-
- @Test
- public void testLoadPodFromTemplateNullConfigMap() {
- final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor not found", true, "unable to locate"));
- testCases.add(new TestTuple<>("Manager not found", false, "unable to locate"));
-
- for (TestTuple<Boolean, String> testCase : testCases) {
- doReturn(null)
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- String message = "";
- try {
- v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(testCase.description, message.contains(testCase.expected));
- }
- }
-
- @Test
- public void testLoadPodFromTemplateNoConfigMap() {
- final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor no ConfigMap", true, "Failed to locate Pod Template"));
- testCases.add(new TestTuple<>("Manager no ConfigMap", false, "Failed to locate Pod Template"));
-
- for (TestTuple<Boolean, String> testCase : testCases) {
- doReturn(new V1ConfigMap())
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- String message = "";
- try {
- v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(testCase.description, message.contains(testCase.expected));
- }
- }
-
- @Test
- public void testLoadPodFromTemplateNoTargetConfigMap() {
- final List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor no target ConfigMap",
- true, "Failed to locate Pod Template"));
- testCases.add(new TestTuple<>("Manager no target ConfigMap",
- false, "Failed to locate Pod Template"));
-
- final V1ConfigMap configMapNoTargetData = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData("Dummy Key", "Dummy Value")
- .build();
-
- for (TestTuple<Boolean, String> testCase : testCases) {
- doReturn(configMapNoTargetData)
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- String message = "";
- try {
- v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(testCase.description, message.contains(testCase.expected));
- }
- }
-
- @Test
- public void testLoadPodFromTemplateBadTargetConfigMap() {
- // ConfigMap with target ConfigMap and an invalid Pod Template.
- final V1ConfigMap configMapInvalidPod = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData(POD_TEMPLATE_NAME, "Dummy Value")
- .build();
-
- // ConfigMap with target ConfigMaps and an empty Pod Template.
- final V1ConfigMap configMapEmptyPod = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData(POD_TEMPLATE_NAME, "")
- .build();
-
- // Test case container.
- // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
- // Output: The expected error message.
- final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor invalid Pod Template",
- new Pair<>(configMapInvalidPod, true), "Error parsing"));
- testCases.add(new TestTuple<>("Manager invalid Pod Template",
- new Pair<>(configMapInvalidPod, false), "Error parsing"));
- testCases.add(new TestTuple<>("Executor empty Pod Template",
- new Pair<>(configMapEmptyPod, true), "Error parsing"));
- testCases.add(new TestTuple<>("Manager empty Pod Template",
- new Pair<>(configMapEmptyPod, false), "Error parsing"));
-
- // Test loop.
- for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
- doReturn(testCase.input.first)
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- String message = "";
- try {
- v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(testCase.description, message.contains(testCase.expected));
- }
- }
-
- @Test
- public void testLoadPodFromTemplateValidConfigMap() {
- final String expected =
- " containers: [class V1Container {\n"
- + " args: null\n"
- + " command: null\n"
- + " env: null\n"
- + " envFrom: null\n"
- + " image: apache/heron:latest\n"
- + " imagePullPolicy: null\n"
- + " lifecycle: null\n"
- + " livenessProbe: null\n"
- + " name: heron-tracker\n"
- + " ports: [class V1ContainerPort {\n"
- + " containerPort: 8888\n"
- + " hostIP: null\n"
- + " hostPort: null\n"
- + " name: api-port\n"
- + " protocol: null\n"
- + " }]\n"
- + " readinessProbe: null\n"
- + " resources: class V1ResourceRequirements {\n"
- + " limits: {cpu=Quantity{number=0.400, format=DECIMAL_SI}, "
- + "memory=Quantity{number=512000000, format=DECIMAL_SI}}\n"
- + " requests: {cpu=Quantity{number=0.100, format=DECIMAL_SI}, "
- + "memory=Quantity{number=200000000, format=DECIMAL_SI}}\n"
- + " }\n"
- + " securityContext: null\n"
- + " startupProbe: null\n"
- + " stdin: null\n"
- + " stdinOnce: null\n"
- + " terminationMessagePath: null\n"
- + " terminationMessagePolicy: null\n"
- + " tty: null\n"
- + " volumeDevices: null\n"
- + " volumeMounts: null\n"
- + " workingDir: null\n"
- + " }]";
-
-
- // ConfigMap with valid Pod Template.
- final V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
- .build();
-
- // Test case container.
- // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
- // Output: The expected Pod template as a string.
- final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor valid Pod Template",
- new Pair<>(configMapValidPod, true), expected));
- testCases.add(new TestTuple<>("Manager valid Pod Template",
- new Pair<>(configMapValidPod, false), expected));
-
- // Test loop.
- for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
- doReturn(testCase.input.first)
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- V1PodTemplateSpec podTemplateSpec = v1ControllerWithPodTemplate.loadPodFromTemplate(true);
-
- Assert.assertTrue(podTemplateSpec.toString().contains(testCase.expected));
- }
- }
-
- @Test
- public void testLoadPodFromTemplateInvalidConfigMap() {
- // ConfigMap with an invalid Pod Template.
- final String invalidPodTemplate =
- "apiVersion: apps/v1\n"
- + "kind: InvalidTemplate\n"
- + "metadata:\n"
- + " name: heron-tracker\n"
- + " namespace: default\n"
- + "template:\n"
- + " metadata:\n"
- + " labels:\n"
- + " app: heron-tracker\n"
- + " spec:\n";
- final V1ConfigMap configMap = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData(POD_TEMPLATE_NAME, invalidPodTemplate)
- .build();
-
-
- // Test case container.
- // Input: ConfigMap to setup mock V1Controller, Boolean flag for executor/manager switch.
- // Output: The expected Pod template as a string.
- final List<TestTuple<Pair<V1ConfigMap, Boolean>, String>> testCases = new LinkedList<>();
- testCases.add(new TestTuple<>("Executor invalid Pod Template",
- new Pair<>(configMap, true), "Error parsing"));
- testCases.add(new TestTuple<>("Manager invalid Pod Template",
- new Pair<>(configMap, false), "Error parsing"));
-
- // Test loop.
- for (TestTuple<Pair<V1ConfigMap, Boolean>, String> testCase : testCases) {
- doReturn(testCase.input.first)
- .when(v1ControllerWithPodTemplate)
- .getConfigMap(anyString());
-
- String message = "";
- try {
- v1ControllerWithPodTemplate.loadPodFromTemplate(testCase.input.second);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(message.contains(testCase.expected));
- }
- }
-
- @Test
- public void testDisablePodTemplates() {
- // ConfigMap with valid Pod Template.
- V1ConfigMap configMapValidPod = new V1ConfigMapBuilder()
- .withNewMetadata()
- .withName(CONFIGMAP_NAME)
- .endMetadata()
- .addToData(POD_TEMPLATE_NAME, POD_TEMPLATE_VALID)
- .build();
- final String expected = "Pod Templates are disabled";
- String message = "";
- doReturn(configMapValidPod)
- .when(v1ControllerPodTemplate)
- .getConfigMap(anyString());
-
- try {
- v1ControllerPodTemplate.loadPodFromTemplate(true);
- } catch (TopologySubmissionException e) {
- message = e.getMessage();
- }
- Assert.assertTrue(message.contains(expected));
- }
-
- @Test
- public void testGetPodTemplateLocationPassing() {
- final Config testConfig = Config.newBuilder()
- .put(POD_TEMPLATE_LOCATION_EXECUTOR, CONFIGMAP_POD_TEMPLATE_NAME)
- .build();
- final V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
- final Pair<String, String> expected = new Pair<>(CONFIGMAP_NAME, POD_TEMPLATE_NAME);
-
- // Correct parsing
- final Pair<String, String> actual = v1Controller.getPodTemplateLocation(true);
- Assert.assertEquals(expected, actual);
- }
-
- @Test
- public void testGetPodTemplateLocationNoConfigMap() {
- expectedException.expect(TopologySubmissionException.class);
- final Config testConfig = Config.newBuilder()
- .put(POD_TEMPLATE_LOCATION_EXECUTOR, ".POD-TEMPLATE-NAME").build();
- V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
- v1Controller.getPodTemplateLocation(true);
- }
-
- @Test
- public void testGetPodTemplateLocationNoPodTemplate() {
- expectedException.expect(TopologySubmissionException.class);
- final Config testConfig = Config.newBuilder()
- .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAME.").build();
- V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
- v1Controller.getPodTemplateLocation(true);
- }
-
- @Test
- public void testGetPodTemplateLocationNoDelimiter() {
- expectedException.expect(TopologySubmissionException.class);
- final Config testConfig = Config.newBuilder()
- .put(POD_TEMPLATE_LOCATION_EXECUTOR, "CONFIGMAP-NAMEPOD-TEMPLATE-NAME").build();
- V1Controller v1Controller = new V1Controller(testConfig, RUNTIME);
- v1Controller.getPodTemplateLocation(true);
- }
@Test
public void testConfigureContainerPorts() {
@@ -450,9 +116,9 @@ public class V1ControllerTest {
final int portNumberkept = 1111;
final int numInstances = 3;
final List<V1ContainerPort> expectedPortsBase =
- Collections.unmodifiableList(V1Controller.getExecutorPorts());
+ Collections.unmodifiableList(StatefulSet.getExecutorPorts());
final List<V1ContainerPort> debugPorts =
- Collections.unmodifiableList(V1Controller.getDebuggingPorts(numInstances));
+ Collections.unmodifiableList(StatefulSet.getDebuggingPorts(numInstances));
final List<V1ContainerPort> inputPortsBase = Collections.unmodifiableList(
Arrays.asList(
new V1ContainerPort()
@@ -463,9 +129,12 @@ public class V1ControllerTest {
)
);
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
// Null ports. This is the default case.
final V1Container inputContainerWithNullPorts = new V1ContainerBuilder().build();
- v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithNullPorts);
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithNullPorts);
Assert.assertTrue("Server and/or shell PORTS for container with null ports list",
CollectionUtils.containsAll(inputContainerWithNullPorts.getPorts(), expectedPortsBase));
@@ -473,7 +142,7 @@ public class V1ControllerTest {
final V1Container inputContainerWithEmptyPorts = new V1ContainerBuilder()
.withPorts(new LinkedList<>())
.build();
- v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithEmptyPorts);
Assert.assertTrue("Server and/or shell PORTS for container with empty ports list",
CollectionUtils.containsAll(inputContainerWithEmptyPorts.getPorts(), expectedPortsBase));
@@ -486,7 +155,7 @@ public class V1ControllerTest {
expectedPortsOverriding
.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
- v1ControllerWithPodTemplate.configureContainerPorts(false, 0, inputContainerWithPorts);
+ STATEFUL_SET.configureContainerPorts(false, 0, inputContainerWithPorts);
Assert.assertTrue("Server and/or shell PORTS for container should be overwritten.",
CollectionUtils.containsAll(inputContainerWithPorts.getPorts(), expectedPortsOverriding));
@@ -500,8 +169,7 @@ public class V1ControllerTest {
expectedPortsDebug.add(new V1ContainerPort().name(portNamekept).containerPort(portNumberkept));
expectedPortsDebug.addAll(debugPorts);
- v1ControllerWithPodTemplate.configureContainerPorts(
- true, numInstances, inputContainerWithDebug);
+ STATEFUL_SET.configureContainerPorts(true, numInstances, inputContainerWithDebug);
Assert.assertTrue("Server and/or shell with debug PORTS for container should be overwritten.",
CollectionUtils.containsAll(inputContainerWithDebug.getPorts(), expectedPortsDebug));
}
@@ -509,7 +177,7 @@ public class V1ControllerTest {
@Test
public void testConfigureContainerEnvVars() {
final List<V1EnvVar> heronEnvVars =
- Collections.unmodifiableList(V1Controller.getExecutorEnvVars());
+ Collections.unmodifiableList(STATEFUL_SET.getExecutorEnvVars());
final V1EnvVar additionEnvVar = new V1EnvVar()
.name("env-variable-to-be-kept")
.valueFrom(new V1EnvVarSource()
@@ -529,9 +197,12 @@ public class V1ControllerTest {
additionEnvVar
);
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
// Null env vars. This is the default case.
V1Container containerWithNullEnvVars = new V1ContainerBuilder().build();
- v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithNullEnvVars);
+ STATEFUL_SET.configureContainerEnvVars(containerWithNullEnvVars);
Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with null Env Vars should match",
CollectionUtils.containsAll(containerWithNullEnvVars.getEnv(), heronEnvVars));
@@ -539,7 +210,7 @@ public class V1ControllerTest {
V1Container containerWithEmptyEnvVars = new V1ContainerBuilder()
.withEnv(new LinkedList<>())
.build();
- v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEmptyEnvVars);
+ STATEFUL_SET.configureContainerEnvVars(containerWithEmptyEnvVars);
Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with empty Env Vars should match",
CollectionUtils.containsAll(containerWithEmptyEnvVars.getEnv(), heronEnvVars));
@@ -549,13 +220,16 @@ public class V1ControllerTest {
V1Container containerWithEnvVars = new V1ContainerBuilder()
.withEnv(inputEnvVars)
.build();
- v1ControllerWithPodTemplate.configureContainerEnvVars(containerWithEnvVars);
+ STATEFUL_SET.configureContainerEnvVars(containerWithEnvVars);
Assert.assertTrue("ENV_HOST & ENV_POD_NAME in container with Env Vars should be overridden",
CollectionUtils.containsAll(containerWithEnvVars.getEnv(), expectedOverriding));
}
@Test
public void testConfigureContainerResources() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final boolean isExecutor = true;
final Resource resourceDefault = new Resource(
@@ -597,16 +271,16 @@ public class V1ControllerTest {
// Default. Null resources.
V1Container containerNull = new V1ContainerBuilder().build();
- v1ControllerWithPodTemplate.configureContainerResources(
- containerNull, configNoLimit, resourceDefault, isExecutor);
+ STATEFUL_SET.configureContainerResources(containerNull, configNoLimit, resourceDefault,
+ isExecutor);
Assert.assertTrue("Default LIMITS should be set in container with null LIMITS",
containerNull.getResources().getLimits().entrySet()
.containsAll(expectDefaultRequirements.getLimits().entrySet()));
// Empty resources.
V1Container containerEmpty = new V1ContainerBuilder().withNewResources().endResources().build();
- v1ControllerWithPodTemplate.configureContainerResources(
- containerEmpty, configNoLimit, resourceDefault, isExecutor);
+ STATEFUL_SET.configureContainerResources(containerEmpty, configNoLimit, resourceDefault,
+ isExecutor);
Assert.assertTrue("Default LIMITS should be set in container with empty LIMITS",
containerNull.getResources().getLimits().entrySet()
.containsAll(expectDefaultRequirements.getLimits().entrySet()));
@@ -615,8 +289,8 @@ public class V1ControllerTest {
V1Container containerCustom = new V1ContainerBuilder()
.withResources(customRequirements)
.build();
- v1ControllerWithPodTemplate.configureContainerResources(
- containerCustom, configNoLimit, resourceDefault, isExecutor);
+ STATEFUL_SET.configureContainerResources(containerCustom, configNoLimit, resourceDefault,
+ isExecutor);
Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS",
containerCustom.getResources().getLimits().entrySet()
.containsAll(expectCustomRequirements.getLimits().entrySet()));
@@ -625,8 +299,8 @@ public class V1ControllerTest {
V1Container containerRequests = new V1ContainerBuilder()
.withResources(customRequirements)
.build();
- v1ControllerWithPodTemplate.configureContainerResources(
- containerRequests, configWithLimit, resourceDefault, isExecutor);
+ STATEFUL_SET.configureContainerResources(containerRequests, configWithLimit, resourceDefault,
+ isExecutor);
Assert.assertTrue("Custom LIMITS should be set in container with custom LIMITS and REQUEST",
containerRequests.getResources().getLimits().entrySet()
.containsAll(expectCustomRequirements.getLimits().entrySet()));
@@ -637,6 +311,9 @@ public class V1ControllerTest {
@Test
public void testConfigureContainerResourcesCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final boolean isExecutor = true;
final String customLimitMEMStr = "120Gi";
final String customLimitCPUStr = "5";
@@ -674,7 +351,7 @@ public class V1ControllerTest {
.build();
final V1Container actual = new V1Container();
- v1ControllerWithPodTemplate.configureContainerResources(actual, config, resources, isExecutor);
+ STATEFUL_SET.configureContainerResources(actual, config, resources, isExecutor);
Assert.assertEquals("Container Resources are set from CLI.", expected, actual);
}
@@ -686,7 +363,10 @@ public class V1ControllerTest {
.put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_NAME, pathNameDefault)
.put(KubernetesContext.KUBERNETES_CONTAINER_VOLUME_MOUNT_PATH, pathDefault)
.build();
- final V1Controller controllerWithMounts = new V1Controller(configWithVolumes, RUNTIME);
+ final StatefulSet.Configs configsWithVolumes = new StatefulSet.Configs(configWithVolumes,
+ RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
+ final StatefulSet.Configs configsWithNoVolumes = new StatefulSet.Configs(
+ Config.newBuilder().build(), RUNTIME, POD_TEMPLATE_SPEC, POD_TEMPLATE_SPEC);
final V1VolumeMount volumeDefault = new V1VolumeMountBuilder()
.withName(pathNameDefault)
.withMountPath(pathDefault)
@@ -707,14 +387,17 @@ public class V1ControllerTest {
);
// No Volume Mounts set.
- V1Controller controllerDoNotSetMounts = new V1Controller(Config.newBuilder().build(), RUNTIME);
+ STATEFUL_SET.setClusterConfigs(configsWithNoVolumes);
V1Container containerNoSetMounts = new V1Container();
- controllerDoNotSetMounts.mountVolumeIfPresent(containerNoSetMounts);
+ STATEFUL_SET.mountVolumeIfPresent(containerNoSetMounts);
Assert.assertNull(containerNoSetMounts.getVolumeMounts());
+ // Configure factory for volume mounts.
+ STATEFUL_SET.setClusterConfigs(configsWithVolumes);
+
// Default. Null Volume Mounts.
V1Container containerNull = new V1ContainerBuilder().build();
- controllerWithMounts.mountVolumeIfPresent(containerNull);
+ STATEFUL_SET.mountVolumeIfPresent(containerNull);
Assert.assertTrue("Default VOLUME MOUNTS should be set in container with null VOLUME MOUNTS",
CollectionUtils.containsAll(expectedMountsDefault, containerNull.getVolumeMounts()));
@@ -722,7 +405,7 @@ public class V1ControllerTest {
V1Container containerEmpty = new V1ContainerBuilder()
.withVolumeMounts(new LinkedList<>())
.build();
- controllerWithMounts.mountVolumeIfPresent(containerEmpty);
+ STATEFUL_SET.mountVolumeIfPresent(containerEmpty);
Assert.assertTrue("Default VOLUME MOUNTS should be set in container with empty VOLUME MOUNTS",
CollectionUtils.containsAll(expectedMountsDefault, containerEmpty.getVolumeMounts()));
@@ -730,20 +413,23 @@ public class V1ControllerTest {
V1Container containerCustom = new V1ContainerBuilder()
.withVolumeMounts(volumeMountsCustomList)
.build();
- controllerWithMounts.mountVolumeIfPresent(containerCustom);
+ STATEFUL_SET.mountVolumeIfPresent(containerCustom);
Assert.assertTrue("Default VOLUME MOUNTS should be set in container with custom VOLUME MOUNTS",
CollectionUtils.containsAll(expectedMountsCustom, containerCustom.getVolumeMounts()));
}
@Test
public void testConfigureTolerations() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final V1Toleration keptToleration = new V1Toleration()
.key("kept toleration")
.operator("Some Operator")
.effect("Some Effect")
.tolerationSeconds(5L);
final List<V1Toleration> expectedTolerationBase =
- Collections.unmodifiableList(V1Controller.getTolerations());
+ Collections.unmodifiableList(getTolerations());
final List<V1Toleration> inputTolerationsBase = Collections.unmodifiableList(
Arrays.asList(
new V1Toleration()
@@ -756,7 +442,7 @@ public class V1ControllerTest {
// Null Tolerations. This is the default case.
final V1PodSpec podSpecNullTolerations = new V1PodSpecBuilder().build();
- v1ControllerWithPodTemplate.configureTolerations(podSpecNullTolerations);
+ STATEFUL_SET.configureTolerations(podSpecNullTolerations);
Assert.assertTrue("Pod Spec has null TOLERATIONS and should be set to Heron's defaults",
CollectionUtils.containsAll(podSpecNullTolerations.getTolerations(),
expectedTolerationBase));
@@ -765,7 +451,7 @@ public class V1ControllerTest {
final V1PodSpec podSpecWithEmptyTolerations = new V1PodSpecBuilder()
.withTolerations(new LinkedList<>())
.build();
- v1ControllerWithPodTemplate.configureTolerations(podSpecWithEmptyTolerations);
+ STATEFUL_SET.configureTolerations(podSpecWithEmptyTolerations);
Assert.assertTrue("Pod Spec has empty TOLERATIONS and should be set to Heron's defaults",
CollectionUtils.containsAll(podSpecWithEmptyTolerations.getTolerations(),
expectedTolerationBase));
@@ -778,7 +464,7 @@ public class V1ControllerTest {
new LinkedList<>(expectedTolerationBase);
expectedTolerationsOverriding.add(keptToleration);
- v1ControllerWithPodTemplate.configureTolerations(podSpecWithTolerations);
+ STATEFUL_SET.configureTolerations(podSpecWithTolerations);
Assert.assertTrue("Pod Spec has TOLERATIONS and should be overridden with Heron's defaults",
CollectionUtils.containsAll(podSpecWithTolerations.getTolerations(),
expectedTolerationsOverriding));
@@ -786,6 +472,9 @@ public class V1ControllerTest {
@Test
public void testCreatePersistentVolumeClaims() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String topologyName = "topology-name";
final String volumeNameOne = "volume-name-one";
final String volumeNameTwo = "volume-name-two";
@@ -838,7 +527,7 @@ public class V1ControllerTest {
final V1PersistentVolumeClaim claimOne = new V1PersistentVolumeClaimBuilder()
.withNewMetadata()
.withName(volumeNameOne)
- .withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+ .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
.endMetadata()
.withNewSpec()
.withStorageClassName(storageClassName)
@@ -853,7 +542,7 @@ public class V1ControllerTest {
final V1PersistentVolumeClaim claimStatic = new V1PersistentVolumeClaimBuilder()
.withNewMetadata()
.withName(volumeNameStatic)
- .withLabels(V1Controller.getPersistentVolumeClaimLabels(topologyName))
+ .withLabels(StatefulSet.getPersistentVolumeClaimLabels(topologyName))
.endMetadata()
.withNewSpec()
.withStorageClassName("")
@@ -869,7 +558,7 @@ public class V1ControllerTest {
new LinkedList<>(Arrays.asList(claimOne, claimStatic));
final List<V1PersistentVolumeClaim> actualClaims =
- v1ControllerWithPodTemplate.createPersistentVolumeClaims(mapPVCOpts);
+ STATEFUL_SET.createPersistentVolumeClaims(mapPVCOpts);
Assert.assertEquals("Generated claim sizes match", expectedClaims.size(), actualClaims.size());
Assert.assertTrue(expectedClaims.containsAll(actualClaims));
@@ -877,6 +566,9 @@ public class V1ControllerTest {
@Test
public void testCreatePersistentVolumeClaimVolumesAndMounts() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String volumeNameOne = "VolumeNameONE";
final String volumeNameTwo = "VolumeNameTWO";
final String claimNameOne = "claim-name-one";
@@ -919,8 +611,8 @@ public class V1ControllerTest {
// Test case container.
// Input: Map of Volume configurations.
// Output: The expected lists of Volumes and Volume Mounts.
- final List<TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
- Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
+ final List<KubernetesUtils.TestTuple<Map<String, Map<VolumeConfigKeys, String>>,
+ Pair<List<V1Volume>, List<V1VolumeMount>>>> testCases = new LinkedList<>();
// Default case: No PVC provided.
testCases.add(new TestTuple<>("Generated an empty list of Volumes", new HashMap<>(),
@@ -939,7 +631,7 @@ public class V1ControllerTest {
Pair<List<V1Volume>, List<V1VolumeMount>>> testCase : testCases) {
List<V1Volume> actualVolume = new LinkedList<>();
List<V1VolumeMount> actualVolumeMount = new LinkedList<>();
- v1ControllerPodTemplate.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
+ STATEFUL_SET.createVolumeAndMountsPersistentVolumeClaimCLI(testCase.input,
actualVolume, actualVolumeMount);
Assert.assertTrue(testCase.description,
@@ -951,6 +643,9 @@ public class V1ControllerTest {
@Test
public void testConfigurePodWithVolumesAndMountsFromCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String volumeNameClashing = "clashing-volume";
final String volumeMountNameClashing = "original-volume-mount";
V1Volume baseVolume = new V1VolumeBuilder()
@@ -1048,7 +743,7 @@ public class V1ControllerTest {
// Testing loop.
for (TestTuple<Object[], Pair<V1PodSpec, V1Container>> testCase : testCases) {
- v1ControllerWithPodTemplate
+ STATEFUL_SET
.configurePodWithVolumesAndMountsFromCLI((V1PodSpec) testCase.input[0],
(V1Container) testCase.input[1], (List<V1Volume>) testCase.input[2],
(List<V1VolumeMount>) testCase.input[3]);
@@ -1062,6 +757,8 @@ public class V1ControllerTest {
@Test
public void testSetShardIdEnvironmentVariableCommand() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
List<TestTuple<Boolean, String>> testCases = new LinkedList<>();
@@ -1072,12 +769,15 @@ public class V1ControllerTest {
for (TestTuple<Boolean, String> testCase : testCases) {
Assert.assertEquals(testCase.description, testCase.expected,
- v1ControllerWithPodTemplate.setShardIdEnvironmentVariableCommand(testCase.input));
+ STATEFUL_SET.setShardIdEnvironmentVariableCommand(testCase.input));
}
}
@Test
public void testCreateResourcesRequirement() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String managerCpuLimit = "3000m";
final String managerMemLimit = "256Gi";
final Quantity memory = Quantity.fromString(managerMemLimit);
@@ -1147,13 +847,16 @@ public class V1ControllerTest {
// Test loop.
for (TestTuple<Map<String, String>, Map<String, Quantity>> testCase : testCases) {
Map<String, Quantity> actual =
- v1ControllerPodTemplate.createResourcesRequirement(testCase.input);
+ STATEFUL_SET.createResourcesRequirement(testCase.input);
Assert.assertEquals(testCase.description, testCase.expected, actual);
}
}
@Test
public void testCreateVolumeAndMountsEmptyDirCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String volumeName = "volume-name-empty-dir";
final String medium = "Memory";
final String sizeLimit = "1Gi";
@@ -1189,13 +892,16 @@ public class V1ControllerTest {
List<V1Volume> actualVolumes = new LinkedList<>();
List<V1VolumeMount> actualMounts = new LinkedList<>();
- v1ControllerPodTemplate.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
+ STATEFUL_SET.createVolumeAndMountsEmptyDirCLI(config, actualVolumes, actualMounts);
Assert.assertEquals("Empty Dir Volume populated", expectedVolumes, actualVolumes);
Assert.assertEquals("Empty Dir Volume Mount populated", expectedMounts, actualMounts);
}
@Test
public void testCreateVolumeAndMountsHostPathCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String volumeName = "volume-name-host-path";
final String type = "DirectoryOrCreate";
final String pathOnHost = "path.on.host";
@@ -1231,13 +937,16 @@ public class V1ControllerTest {
List<V1Volume> actualVolumes = new LinkedList<>();
List<V1VolumeMount> actualMounts = new LinkedList<>();
- v1ControllerPodTemplate.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
+ STATEFUL_SET.createVolumeAndMountsHostPathCLI(config, actualVolumes, actualMounts);
Assert.assertEquals("Host Path Volume populated", expectedVolumes, actualVolumes);
Assert.assertEquals("Host Path Volume Mount populated", expectedMounts, actualMounts);
}
@Test
public void testCreateVolumeAndMountsNFSCLI() {
+ // Load configurations into test class.
+ STATEFUL_SET.setClusterConfigs(CLUSTER_CONFIGS);
+
final String volumeName = "volume-name-nfs";
final String server = "nfs.server.address";
final String pathOnNFS = "path.on.host";
@@ -1277,7 +986,7 @@ public class V1ControllerTest {
List<V1Volume> actualVolumes = new LinkedList<>();
List<V1VolumeMount> actualMounts = new LinkedList<>();
- v1ControllerPodTemplate.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
+ STATEFUL_SET.createVolumeAndMountsNFSCLI(config, actualVolumes, actualMounts);
Assert.assertEquals("NFS Volume populated", expectedVolumes, actualVolumes);
Assert.assertEquals("NFS Volume Mount populated", expectedMounts, actualMounts);
}
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
index 22d3088b598..9becf8b8d7b 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/VolumesTests.java
@@ -244,7 +244,7 @@ public class VolumesTests {
final String volumeMode = "VolumeMode";
final String path = "/path/to/mount/";
final String subPath = "/sub/path/to/mount/";
- final Map<String, String> labels = V1Controller.getPersistentVolumeClaimLabels(topologyName);
+ final Map<String, String> labels = KubernetesShim.getPersistentVolumeClaimLabels(topologyName);
final Map<KubernetesConstants.VolumeConfigKeys, String> volOneConfig =
new HashMap<KubernetesConstants.VolumeConfigKeys, String>() {