You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/20 03:07:26 UTC
[incubator-heron] branch saadurrahman/3846-Refactoring-K8s-Shim-dev updated: [StatefulSet] copied methods to create stateful sets to factory.
This is an automated email from the ASF dual-hosted git repository.
saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git
The following commit(s) were added to refs/heads/saadurrahman/3846-Refactoring-K8s-Shim-dev by this push:
new cc14e7c070f [StatefulSet] copied methods to create stateful sets to factory.
cc14e7c070f is described below
commit cc14e7c070f3bf9e0a970cecf735e4ad7dfa9a9d
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Tue Jul 19 23:07:08 2022 -0400
[StatefulSet] copied methods to create stateful sets to factory.
---
.../heron/scheduler/kubernetes/StatefulSet.java | 981 ++++++++++++++++++++-
1 file changed, 978 insertions(+), 3 deletions(-)
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index cea3a91fc05..3f23b4be321 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -19,14 +19,59 @@
package org.apache.heron.scheduler.kubernetes;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.heron.api.utils.TopologyUtils;
+import org.apache.heron.common.basics.Pair;
+import org.apache.heron.scheduler.TopologyRuntimeManagementException;
+import org.apache.heron.scheduler.TopologySubmissionException;
import org.apache.heron.scheduler.utils.Runtime;
+import org.apache.heron.scheduler.utils.SchedulerUtils;
+import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
import org.apache.heron.spi.common.Config;
import org.apache.heron.spi.packing.Resource;
+import io.kubernetes.client.custom.Quantity;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.CoreV1Api;
+import io.kubernetes.client.openapi.models.V1ConfigMap;
+import io.kubernetes.client.openapi.models.V1Container;
+import io.kubernetes.client.openapi.models.V1ContainerPort;
+import io.kubernetes.client.openapi.models.V1EnvVar;
+import io.kubernetes.client.openapi.models.V1EnvVarSource;
+import io.kubernetes.client.openapi.models.V1LabelSelector;
+import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
+import io.kubernetes.client.openapi.models.V1ObjectMeta;
+import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
+import io.kubernetes.client.openapi.models.V1PodSpec;
+import io.kubernetes.client.openapi.models.V1PodTemplate;
+import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
+import io.kubernetes.client.openapi.models.V1ResourceRequirements;
+import io.kubernetes.client.openapi.models.V1SecretKeySelector;
+import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
+import io.kubernetes.client.openapi.models.V1Service;
+import io.kubernetes.client.openapi.models.V1ServiceSpec;
import io.kubernetes.client.openapi.models.V1StatefulSet;
+import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
+import io.kubernetes.client.openapi.models.V1Toleration;
+import io.kubernetes.client.openapi.models.V1Volume;
+import io.kubernetes.client.openapi.models.V1VolumeMount;
+import io.kubernetes.client.util.Yaml;
final class StatefulSet {
private final Map<Type, IStatefulSetFactory> statefulsets = new HashMap<>();
@@ -36,21 +81,32 @@ final class StatefulSet {
Manager
}
+ private Configs clusterConfigs;
+
+ private static final Logger LOG = Logger.getLogger(StatefulSet.class.getName());
+
+ private static final String ENV_SHARD_ID = "SHARD_ID";
+
/**
* Container class of all the Kubernetes cluster configurations. The methods contained within
* <code>KubernetesController</code> cannot be accessed externally since it is an abstract class.
*/
static final class Configs {
+ private final CoreV1Api coreClient;
private final String namespace;
private final String topologyName;
private final Config configuration;
private final Config runtimeConfiguration;
+ private final boolean podTemplateDisabled;
- Configs(String namespace, Config configuration, Config runtimeConfiguration) {
+ Configs(CoreV1Api coreClient, String namespace, Config configuration,
+ Config runtimeConfiguration) {
+ this.coreClient = coreClient;
this.namespace = namespace;
this.topologyName = Runtime.topologyName(runtimeConfiguration);
this.configuration = configuration;
this.runtimeConfiguration = runtimeConfiguration;
+ this.podTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
}
Config getConfiguration() {
@@ -68,8 +124,17 @@ final class StatefulSet {
String getTopologyName() {
return topologyName;
}
+
+ boolean isPodTemplateDisabled() {
+ return podTemplateDisabled;
+ }
+
+ public CoreV1Api getCoreClient() {
+ return coreClient;
+ }
}
+
private StatefulSet() {
statefulsets.put(Type.Executor, new ExecutorFactory());
statefulsets.put(Type.Manager, new ManagerFactory());
@@ -95,21 +160,931 @@ final class StatefulSet {
return null;
}
- static class ExecutorFactory implements IStatefulSetFactory {
+ class ExecutorFactory implements IStatefulSetFactory {
@Override
public V1StatefulSet create(Configs configs, Resource containerResources,
int numberOfInstances) {
+ clusterConfigs = configs;
return null;
}
}
- static class ManagerFactory implements IStatefulSetFactory {
+ class ManagerFactory implements IStatefulSetFactory {
@Override
public V1StatefulSet create(Configs configs, Resource containerResources,
int numberOfInstances) {
+ clusterConfigs = configs;
return null;
}
}
+
+
+ /**
+ * Generates the command to start Heron within the <code>container</code>.
+ * @param containerId Passed down to <>SchedulerUtils</> to generate executor command.
+ * @param numOfInstances Used to configure the debugging ports.
+ * @param isExecutor Flag used to generate the correct <code>shard_id</code>.
+ * @return The complete command to start Heron in a <code>container</code>.
+ */
+ protected List<String> getExecutorCommand(String containerId, int numOfInstances,
+ boolean isExecutor) {
+ final Config configuration = clusterConfigs.getConfiguration();
+ final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
+ final Map<ExecutorPort, String> ports =
+ KubernetesConstants.EXECUTOR_PORTS.entrySet()
+ .stream()
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> e.getValue().toString()));
+
+ if (TopologyUtils.getTopologyRemoteDebuggingEnabled(Runtime.topology(runtimeConfiguration))
+ && numOfInstances != 0) {
+ List<String> remoteDebuggingPorts = new LinkedList<>();
+ IntStream.range(0, numOfInstances).forEach(i -> {
+ int port = KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i;
+ remoteDebuggingPorts.add(String.valueOf(port));
+ });
+ ports.put(ExecutorPort.JVM_REMOTE_DEBUGGER_PORTS,
+ String.join(",", remoteDebuggingPorts));
+ }
+
+ final String[] executorCommand =
+ SchedulerUtils.getExecutorCommand(configuration, runtimeConfiguration,
+ containerId, ports);
+ return Arrays.asList(
+ "sh",
+ "-c",
+ KubernetesUtils.getConfCommand(configuration)
+ + " && " + KubernetesUtils.getFetchCommand(configuration, runtimeConfiguration)
+ + " && " + setShardIdEnvironmentVariableCommand(isExecutor)
+ + " && " + String.join(" ", executorCommand)
+ );
+ }
+
+ /**
+ * Configures the <code>shard_id</code> for the Heron container based on whether it is an <code>executor</code>
+ * or <code>manager</code>. <code>executor</code> IDs are [1 - n) and the <code>manager</code> IDs start at 0.
+ * @param isExecutor Switch flag to generate correct command.
+ * @return The command required to put the Heron instance in <code>executor</code> or <code>manager</code> mode.
+ */
+ @VisibleForTesting
+ protected static String setShardIdEnvironmentVariableCommand(boolean isExecutor) {
+ final String pattern = String.format("%%s=%s && echo shardId=${%%s}",
+ isExecutor ? "$((${POD_NAME##*-} + 1))" : "${POD_NAME##*-}");
+ return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
+ }
+
+ /**
+ * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
+ * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
+ */
+ private V1Service createTopologyService() {
+ final String topologyName = clusterConfigs.getTopologyName();
+
+ final V1Service service = new V1Service();
+
+ // Setup service metadata.
+ final V1ObjectMeta objectMeta = new V1ObjectMeta()
+ .name(topologyName)
+ .annotations(getServiceAnnotations())
+ .labels(getServiceLabels());
+ service.setMetadata(objectMeta);
+
+ // Create the headless service.
+ final V1ServiceSpec serviceSpec = new V1ServiceSpec()
+ .clusterIP("None")
+ .selector(getPodMatchLabels(topologyName));
+
+ service.setSpec(serviceSpec);
+
+ return service;
+ }
+
+ /**
+ * Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in.
+ * @param containerResource Passed down to configure the <code>executor</code> resource limits.
+ * @param numberOfInstances Used to configure the execution command and ports for the <code>executor</code>.
+ * @param isExecutor Flag used to configure components specific to <code>executor</code> and <code>manager</code>.
+ * @return A fully configured <code>V1StatefulSet</code> for the topology's <code>executors</code>.
+ */
+ private V1StatefulSet createStatefulSet(Resource containerResource, int numberOfInstances,
+ boolean isExecutor) {
+ final String topologyName = clusterConfigs.getTopologyName();
+ final Config runtimeConfiguration = clusterConfigs.getRuntimeConfiguration();
+
+ final List<V1Volume> volumes = new LinkedList<>();
+ final List<V1VolumeMount> volumeMounts = new LinkedList<>();
+
+ // Collect Persistent Volume Claim configurations from the CLI.
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configsPVC =
+ KubernetesContext.getVolumeClaimTemplates(clusterConfigs.getConfiguration(), isExecutor);
+
+ // Collect all Volume configurations from the CLI and generate Volumes and Volume Mounts.
+ createVolumeAndMountsPersistentVolumeClaimCLI(configsPVC, volumes, volumeMounts);
+ createVolumeAndMountsHostPathCLI(
+ KubernetesContext.getVolumeHostPath(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
+ createVolumeAndMountsEmptyDirCLI(
+ KubernetesContext.getVolumeEmptyDir(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
+ createVolumeAndMountsNFSCLI(
+ KubernetesContext.getVolumeNFS(clusterConfigs.getConfiguration(), isExecutor),
+ volumes, volumeMounts);
+
+ final V1StatefulSet statefulSet = new V1StatefulSet();
+
+ // Setup StatefulSet's metadata.
+ final V1ObjectMeta objectMeta = new V1ObjectMeta()
+ .name(getStatefulSetName(isExecutor))
+ .labels(getPodLabels(topologyName));
+ statefulSet.setMetadata(objectMeta);
+
+ // Create the StatefulSet Spec.
+ // Reduce replica count by one for Executors and set to one for Manager.
+ final int replicasCount =
+ isExecutor ? Runtime.numContainers(runtimeConfiguration).intValue() - 1 : 1;
+ final V1StatefulSetSpec statefulSetSpec = new V1StatefulSetSpec()
+ .serviceName(topologyName)
+ .replicas(replicasCount);
+
+ // Parallel pod management tells the StatefulSet controller to launch or terminate
+ // all Pods in parallel, and not to wait for Pods to become Running and Ready or completely
+ // terminated prior to launching or terminating another Pod.
+ statefulSetSpec.setPodManagementPolicy("Parallel");
+
+ // Add selector match labels "app=heron" and "topology=topology-name"
+ // so we know which pods to manage.
+ final V1LabelSelector selector = new V1LabelSelector()
+ .matchLabels(getPodMatchLabels(topologyName));
+ statefulSetSpec.setSelector(selector);
+
+ // Create a Pod Template.
+ final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
+
+ // Set up Pod Metadata.
+ final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName));
+ Map<String, String> annotations = new HashMap<>();
+ annotations.putAll(getPodAnnotations());
+ annotations.putAll(getPrometheusAnnotations());
+ templateMetaData.setAnnotations(annotations);
+ podTemplateSpec.setMetadata(templateMetaData);
+
+ configurePodSpec(podTemplateSpec, containerResource, numberOfInstances, isExecutor,
+ volumes, volumeMounts);
+
+ statefulSetSpec.setTemplate(podTemplateSpec);
+
+ statefulSet.setSpec(statefulSetSpec);
+
+ statefulSetSpec.setVolumeClaimTemplates(createPersistentVolumeClaims(configsPVC));
+
+ return statefulSet;
+ }
+
+ /**
+ * Extracts general Pod <code>Annotation</code>s from configurations.
+ * @return Key-value pairs of general <code>Annotation</code>s to be added to the Pod.
+ */
+ private Map<String, String> getPodAnnotations() {
+ return KubernetesContext.getPodAnnotations(clusterConfigs.getConfiguration());
+ }
+
+ /**
+ * Extracts <code>Service Annotations</code> for configurations.
+ * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
+ */
+ private Map<String, String> getServiceAnnotations() {
+ return KubernetesContext.getServiceAnnotations(clusterConfigs.getConfiguration());
+ }
+
+ /**
+ * Generates <code>Label</code>s to indicate Prometheus scraping and the exposed port.
+ * @return Key-value pairs of Prometheus <code>Annotation</code>s to be added to the Pod.
+ */
+ private Map<String, String> getPrometheusAnnotations() {
+ final Map<String, String> annotations = new HashMap<>();
+ annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_SCRAPE, "true");
+ annotations.put(KubernetesConstants.ANNOTATION_PROMETHEUS_PORT,
+ KubernetesConstants.PROMETHEUS_PORT);
+
+ return annotations;
+ }
+
+ /**
+ * Generates the <code>heron</code> and <code>topology</code> name <code>Match Label</code>s.
+ * @param topologyName Name of the <code>topology</code>.
+ * @return Key-value pairs of <code>Match Label</code>s to be added to the Pod.
+ */
+ private Map<String, String> getPodMatchLabels(String topologyName) {
+ final Map<String, String> labels = new HashMap<>();
+ labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
+ labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ return labels;
+ }
+
+ /**
+ * Extracts <code>Label</code>s from configurations, generates the <code>heron</code> and
+ * <code>topology</code> name <code>Label</code>s.
+ * @param topologyName Name of the <code>topology</code>.
+ * @return Key-value pairs of <code>Label</code>s to be added to the Pod.
+ */
+ private Map<String, String> getPodLabels(String topologyName) {
+ final Map<String, String> labels = new HashMap<>();
+ labels.put(KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE);
+ labels.put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ labels.putAll(KubernetesContext.getPodLabels(clusterConfigs.getConfiguration()));
+ return labels;
+ }
+
+ /**
+ * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
+ * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
+ */
+ private Map<String, String> getServiceLabels() {
+ return KubernetesContext.getServiceLabels(clusterConfigs.getConfiguration());
+ }
+
+ /**
+ * Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container
+ * will be configured to allow it to function but other supplied containers are loaded verbatim.
+ * @param podTemplateSpec The <code>Pod Template Spec</code> section to update.
+ * @param resource Passed down to configure the resource limits.
+ * @param numberOfInstances Passed down to configure the ports.
+ * @param isExecutor Flag used to configure components specific to <code>Executor</code> and <code>Manager</code>.
+ * @param volumes <code>Volumes</code> generated from configurations options.
+ * @param volumeMounts <code>Volume Mounts</code> generated from configurations options.
+ */
+ private void configurePodSpec(final V1PodTemplateSpec podTemplateSpec, Resource resource,
+ int numberOfInstances, boolean isExecutor, List<V1Volume> volumes,
+ List<V1VolumeMount> volumeMounts) {
+ if (podTemplateSpec.getSpec() == null) {
+ podTemplateSpec.setSpec(new V1PodSpec());
+ }
+ final V1PodSpec podSpec = podTemplateSpec.getSpec();
+
+ // Set the termination period to 0 so pods can be deleted quickly
+ podSpec.setTerminationGracePeriodSeconds(0L);
+
+ // Set the pod tolerations so pods are rescheduled when nodes go down
+ // https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/#taint-based-evictions
+ configureTolerations(podSpec);
+
+ // Get <Heron> container and ignore all others.
+ final String containerName =
+ isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME;
+ V1Container heronContainer = null;
+ List<V1Container> containers = podSpec.getContainers();
+ if (containers != null) {
+ for (V1Container container : containers) {
+ final String name = container.getName();
+ if (name != null && name.equals(containerName)) {
+ if (heronContainer != null) {
+ throw new TopologySubmissionException(
+ String.format("Multiple configurations found for '%s' container", containerName));
+ }
+ heronContainer = container;
+ }
+ }
+ } else {
+ containers = new LinkedList<>();
+ }
+
+ if (heronContainer == null) {
+ heronContainer = new V1Container().name(containerName);
+ containers.add(heronContainer);
+ }
+
+ if (!volumes.isEmpty() || !volumeMounts.isEmpty()) {
+ configurePodWithVolumesAndMountsFromCLI(podSpec, heronContainer, volumes,
+ volumeMounts);
+ }
+
+ configureHeronContainer(resource, numberOfInstances, heronContainer, isExecutor);
+
+ podSpec.setContainers(containers);
+
+ mountSecretsAsVolumes(podSpec);
+ }
+
+ /**
+ * Adds <code>tolerations</code> to the <code>Pod Spec</code> with Heron's values taking precedence.
+ * @param spec <code>Pod Spec</code> to be configured.
+ */
+ @VisibleForTesting
+ protected void configureTolerations(final V1PodSpec spec) {
+ KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ spec.setTolerations(
+ utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
+ Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations")
+ );
+ }
+
+ /**
+ * Generates a list of <code>tolerations</code> which Heron requires.
+ * @return A list of configured <code>tolerations</code>.
+ */
+ @VisibleForTesting
+ protected static List<V1Toleration> getTolerations() {
+ final List<V1Toleration> tolerations = new ArrayList<>();
+ KubernetesConstants.TOLERATIONS.forEach(t -> {
+ final V1Toleration toleration =
+ new V1Toleration()
+ .key(t)
+ .operator("Exists")
+ .effect("NoExecute")
+ .tolerationSeconds(10L);
+ tolerations.add(toleration);
+ });
+
+ return tolerations;
+ }
+
+ /**
+ * Adds <code>Volume Mounts</code> for <code>Secrets</code> to a pod.
+ * @param podSpec <code>Pod Spec</code> to add secrets to.
+ */
+ private void mountSecretsAsVolumes(V1PodSpec podSpec) {
+ final Config config = clusterConfigs.getConfiguration();
+ final Map<String, String> secrets = KubernetesContext.getPodSecretsToMount(config);
+ for (Map.Entry<String, String> secret : secrets.entrySet()) {
+ final V1VolumeMount mount = new V1VolumeMount()
+ .name(secret.getKey())
+ .mountPath(secret.getValue());
+ final V1Volume secretVolume = new V1Volume()
+ .name(secret.getKey())
+ .secret(new V1SecretVolumeSourceBuilder()
+ .withSecretName(secret.getKey())
+ .build());
+ podSpec.addVolumesItem(secretVolume);
+ for (V1Container container : podSpec.getContainers()) {
+ container.addVolumeMountsItem(mount);
+ }
+ }
+ }
+
+ /**
+ * Configures the <code>Heron</code> container with values for parameters Heron requires for functioning.
+ * @param resource Resource limits.
+ * @param numberOfInstances Required number of <code>executor</code> containers which is used to configure ports.
+ * @param container The <code>executor</code> container to be configured.
+ * @param isExecutor Flag indicating whether to set a <code>executor</code> or <code>manager</code> command.
+ */
+ private void configureHeronContainer(Resource resource, int numberOfInstances,
+ final V1Container container, boolean isExecutor) {
+ final Config configuration = clusterConfigs.getConfiguration();
+
+ // Set up the container images.
+ container.setImage(KubernetesContext.getExecutorDockerImage(configuration));
+
+ // Set up the container command.
+ final List<String> command =
+ getExecutorCommand("$" + ENV_SHARD_ID, numberOfInstances, isExecutor);
+ container.setCommand(command);
+
+ if (KubernetesContext.hasImagePullPolicy(configuration)) {
+ container.setImagePullPolicy(KubernetesContext.getKubernetesImagePullPolicy(configuration));
+ }
+
+ // Configure environment variables.
+ configureContainerEnvVars(container);
+
+ // Set secret keys.
+ setSecretKeyRefs(container);
+
+ // Set container resources
+ configureContainerResources(container, configuration, resource, isExecutor);
+
+ // Set container ports.
+ final boolean debuggingEnabled =
+ TopologyUtils.getTopologyRemoteDebuggingEnabled(
+ Runtime.topology(clusterConfigs.getRuntimeConfiguration()));
+ configureContainerPorts(debuggingEnabled, numberOfInstances, container);
+
+ // setup volume mounts
+ mountVolumeIfPresent(container);
+ }
+
+ /**
+ * Configures the resources in the <code>container</code> with values in the <code>config</code> taking precedence.
+ * @param container The <code>container</code> to be configured.
+ * @param configuration The <code>Config</code> object to check if a resource request needs to be set.
+ * @param resource User defined resources limits from input.
+ * @param isExecutor Flag to indicate configuration for an <code>executor</code> or <code>manager</code>.
+ */
+ @VisibleForTesting
+ protected void configureContainerResources(final V1Container container,
+ final Config configuration, final Resource resource,
+ boolean isExecutor) {
+ if (container.getResources() == null) {
+ container.setResources(new V1ResourceRequirements());
+ }
+ final V1ResourceRequirements resourceRequirements = container.getResources();
+
+ // Collect Limits and Requests from CLI.
+ final Map<String, Quantity> limitsCLI = createResourcesRequirement(
+ KubernetesContext.getResourceLimits(configuration, isExecutor));
+ final Map<String, Quantity> requestsCLI = createResourcesRequirement(
+ KubernetesContext.getResourceRequests(configuration, isExecutor));
+
+ if (resourceRequirements.getLimits() == null) {
+ resourceRequirements.setLimits(new HashMap<>());
+ }
+
+ // Set Limits and Resources from CLI <if> available, <else> use Configs. Deduplicate on name
+ // with precedence [1] CLI, [2] Config.
+ final Map<String, Quantity> limits = resourceRequirements.getLimits();
+ final Quantity limitCPU = limitsCLI.getOrDefault(KubernetesConstants.CPU,
+ Quantity.fromString(Double.toString(KubernetesUtils.roundDecimal(resource.getCpu(), 3))));
+ final Quantity limitMEMORY = limitsCLI.getOrDefault(KubernetesConstants.MEMORY,
+ Quantity.fromString(KubernetesUtils.Megabytes(resource.getRam())));
+
+ limits.put(KubernetesConstants.MEMORY, limitMEMORY);
+ limits.put(KubernetesConstants.CPU, limitCPU);
+
+ // Set the Kubernetes container resource request.
+ // Order: [1] CLI, [2] EQUAL_TO_LIMIT, [3] NOT_SET
+ KubernetesContext.KubernetesResourceRequestMode requestMode =
+ KubernetesContext.getKubernetesRequestMode(configuration);
+ if (!requestsCLI.isEmpty()) {
+ if (resourceRequirements.getRequests() == null) {
+ resourceRequirements.setRequests(new HashMap<>());
+ }
+ final Map<String, Quantity> requests = resourceRequirements.getRequests();
+
+ if (requestsCLI.containsKey(KubernetesConstants.MEMORY)) {
+ requests.put(KubernetesConstants.MEMORY, requestsCLI.get(KubernetesConstants.MEMORY));
+ }
+ if (requestsCLI.containsKey(KubernetesConstants.CPU)) {
+ requests.put(KubernetesConstants.CPU, requestsCLI.get(KubernetesConstants.CPU));
+ }
+ } else if (requestMode == KubernetesContext.KubernetesResourceRequestMode.EQUAL_TO_LIMIT) {
+ LOG.log(Level.CONFIG, "Setting K8s Request equal to Limit");
+ resourceRequirements.setRequests(limits);
+ } else {
+ LOG.log(Level.CONFIG, "Not setting K8s request because config was NOT_SET");
+ }
+ container.setResources(resourceRequirements);
+ }
+
+ /**
+ * Creates <code>Resource Requirements</code> from a Map of <code>Config</code> items for <code>CPU</code>
+ * and <code>Memory</code>.
+ * @param configs <code>Configs</code> to be parsed for configuration.
+ * @return Configured <code>Resource Requirements</code>. An <code>empty</code> map will be returned
+ * if there are no <code>configs</code>.
+ */
+ @VisibleForTesting
+ protected Map<String, Quantity> createResourcesRequirement(Map<String, String> configs) {
+ final Map<String, Quantity> requirements = new HashMap<>();
+
+ if (configs == null || configs.isEmpty()) {
+ return requirements;
+ }
+
+ final String memoryLimit = configs.get(KubernetesConstants.MEMORY);
+ if (memoryLimit != null && !memoryLimit.isEmpty()) {
+ requirements.put(KubernetesConstants.MEMORY, Quantity.fromString(memoryLimit));
+ }
+ final String cpuLimit = configs.get(KubernetesConstants.CPU);
+ if (cpuLimit != null && !cpuLimit.isEmpty()) {
+ requirements.put(KubernetesConstants.CPU, Quantity.fromString(cpuLimit));
+ }
+
+ return requirements;
+ }
+
+ /**
+ * Configures the environment variables in the <code>container</code> with those Heron requires.
+ * Heron's values take precedence.
+ * @param container The <code>container</code> to be configured.
+ */
+ @VisibleForTesting
+ protected void configureContainerEnvVars(final V1Container container) {
+ // Deduplicate on var name with Heron defaults take precedence.
+ KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>();
+ container.setEnv(
+ utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
+ Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables")
+ );
+ }
+
+ /**
+ * Generates a list of <code>Environment Variables</code> required by Heron to function.
+ * @return A list of configured <code>Environment Variables</code> required by Heron to function.
+ */
+ @VisibleForTesting
+ protected static List<V1EnvVar> getExecutorEnvVars() {
+ final V1EnvVar envVarHost = new V1EnvVar();
+ envVarHost.name(KubernetesConstants.ENV_HOST)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath(KubernetesConstants.POD_IP)));
+
+ final V1EnvVar envVarPodName = new V1EnvVar();
+ envVarPodName.name(KubernetesConstants.ENV_POD_NAME)
+ .valueFrom(new V1EnvVarSource()
+ .fieldRef(new V1ObjectFieldSelector()
+ .fieldPath(KubernetesConstants.POD_NAME)));
+
+ return Arrays.asList(envVarHost, envVarPodName);
+ }
+
+ /**
+ * Configures the ports in the <code>container</code> with those Heron requires. Heron's values take precedence.
+ * @param remoteDebugEnabled Flag used to indicate if debugging ports need to be added.
+ * @param numberOfInstances The number of debugging ports to be opened.
+ * @param container <code>container</code> to be configured.
+ */
+ @VisibleForTesting
+ protected void configureContainerPorts(boolean remoteDebugEnabled, int numberOfInstances,
+ final V1Container container) {
+ List<V1ContainerPort> ports = new ArrayList<>(getExecutorPorts());
+
+ if (remoteDebugEnabled) {
+ ports.addAll(getDebuggingPorts(numberOfInstances));
+ }
+
+ // Set container ports. Deduplicate using port number with Heron defaults taking precedence.
+ KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ container.setPorts(
+ utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
+ Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports")
+ );
+ }
+
+ /**
+ * Generates a list of <code>ports</code> required by Heron to function.
+ * @return A list of configured <code>ports</code> required by Heron to function.
+ */
+ @VisibleForTesting
+ protected static List<V1ContainerPort> getExecutorPorts() {
+ List<V1ContainerPort> ports = new LinkedList<>();
+ KubernetesConstants.EXECUTOR_PORTS.forEach((p, v) -> {
+ final V1ContainerPort port = new V1ContainerPort()
+ .name(p.getName())
+ .containerPort(v);
+ ports.add(port);
+ });
+ return ports;
+ }
+
+ /**
+ * Generate the debugging ports required by Heron.
+ * @param numberOfInstances The number of debugging ports to generate.
+ * @return A list of configured debugging <code>ports</code>.
+ */
+ @VisibleForTesting
+ protected static List<V1ContainerPort> getDebuggingPorts(int numberOfInstances) {
+ List<V1ContainerPort> ports = new LinkedList<>();
+ IntStream.range(0, numberOfInstances).forEach(i -> {
+ final String portName =
+ KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT_NAME + "-" + i;
+ final V1ContainerPort port = new V1ContainerPort()
+ .name(portName)
+ .containerPort(KubernetesConstants.JVM_REMOTE_DEBUGGER_PORT + i);
+ ports.add(port);
+ });
+ return ports;
+ }
+
+ /**
+ * Adds volume mounts to the <code>container</code> that Heron requires. Heron's values taking precedence.
+ * @param container <code>container</code> to be configured.
+ */
+ @VisibleForTesting
+ protected void mountVolumeIfPresent(final V1Container container) {
+ final Config config = clusterConfigs.getConfiguration();
+ if (KubernetesContext.hasContainerVolume(config)) {
+ final V1VolumeMount mount =
+ new V1VolumeMount()
+ .name(KubernetesContext.getContainerVolumeName(config))
+ .mountPath(KubernetesContext.getContainerVolumeMountPath(config));
+
+ // Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence.
+ KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
+ new KubernetesUtils.V1ControllerUtils<>();
+ container.setVolumeMounts(
+ utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(),
+ Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts")
+ );
+ }
+ }
+
+ /**
+ * Adds <code>Secret Key</code> references to a <code>container</code>.
+ * @param container <code>container</code> to be configured.
+ */
+ private void setSecretKeyRefs(V1Container container) {
+ final Config config = clusterConfigs.getConfiguration();
+ final Map<String, String> podSecretKeyRefs = KubernetesContext.getPodSecretKeyRefs(config);
+ for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
+ final String[] keyRefParts = secret.getValue().split(":");
+ if (keyRefParts.length != 2) {
+ LOG.log(Level.SEVERE,
+ "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+ throw new TopologyRuntimeManagementException(
+ "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+ }
+ String name = keyRefParts[0];
+ String key = keyRefParts[1];
+ final V1EnvVar envVar = new V1EnvVar()
+ .name(secret.getKey())
+ .valueFrom(new V1EnvVarSource()
+ .secretKeyRef(new V1SecretKeySelector()
+ .key(key)
+ .name(name)));
+ container.addEnvItem(envVar);
+ }
+ }
+
+ /**
+ * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
+ * The loaded text is then parsed into a usable <code>Pod Template</code>.
+ * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+ * or <code>Manager</code>.
+ * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
+ */
+ @VisibleForTesting
+ protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
+ final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
+
+ // Default Pod Template.
+ if (podTemplateConfigMapName == null) {
+ LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
+ return new V1PodTemplateSpec();
+ }
+
+ if (clusterConfigs.isPodTemplateDisabled()) {
+ throw new TopologySubmissionException("Custom Pod Templates are disabled");
+ }
+
+ final String configMapName = podTemplateConfigMapName.first;
+ final String podTemplateName = podTemplateConfigMapName.second;
+
+ // Attempt to locate ConfigMap with provided Pod Template name.
+ try {
+ V1ConfigMap configMap = getConfigMap(configMapName);
+ if (configMap == null) {
+ throw new ApiException(
+ String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
+ }
+
+ final Map<String, String> configMapData = configMap.getData();
+ if (configMapData != null && configMapData.containsKey(podTemplateName)) {
+ // NullPointerException when Pod Template is empty.
+ V1PodTemplateSpec podTemplate = ((V1PodTemplate)
+ Yaml.load(configMapData.get(podTemplateName))).getTemplate();
+ LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
+ configMapName, podTemplateName));
+ return podTemplate;
+ }
+
+ // Failure to locate Pod Template with provided name.
+ throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
+ podTemplateName, configMapName));
+ } catch (ApiException e) {
+ KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
+ throw new TopologySubmissionException(e.getMessage());
+ } catch (IOException | ClassCastException | NullPointerException e) {
+ final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
+ podTemplateName, configMapName);
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ /**
+ * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
+ * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
+ * or <code>Manager</code>.
+ * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
+ */
+ @VisibleForTesting
+ protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
+ final String podTemplateConfigMapName = KubernetesContext
+ .getPodTemplateConfigMapName(clusterConfigs.getConfiguration(), isExecutor);
+
+ if (podTemplateConfigMapName == null) {
+ return null;
+ }
+
+ try {
+ final int splitPoint = podTemplateConfigMapName.indexOf(".");
+ final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
+ final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
+
+ if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
+ throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
+ }
+
+ return new Pair<>(configMapName, podTemplateName);
+ } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
+ final String message = "Invalid ConfigMap and/or Pod Template name";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(message);
+ }
+ }
+
+ /**
+ * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
+ * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
+ * @return The retrieved <code>ConfigMap</code>.
+ */
+ @VisibleForTesting
+ protected V1ConfigMap getConfigMap(String configMapName) {
+ try {
+ return clusterConfigs.getCoreClient().readNamespacedConfigMap(
+ configMapName,
+ clusterConfigs.getNamespace(),
+ null);
+ } catch (ApiException e) {
+ final String message = "Error retrieving ConfigMaps";
+ KubernetesUtils.logExceptionWithDetails(LOG, message, e);
+ throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
+ }
+ }
+
+ /**
+ * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
+ * to <code>key-value</code> pairs of configuration options and values.
+ * @param mapOfOpts <code>Volume</code> to configuration <code>key-value</code> mappings.
+ * @return Fully populated list of only dynamically backed <code>Persistent Volume Claims</code>.
+ */
+ @VisibleForTesting
+ protected List<V1PersistentVolumeClaim> createPersistentVolumeClaims(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts) {
+
+ List<V1PersistentVolumeClaim> listOfPVCs = new LinkedList<>();
+
+ // Iterate over all the PVC Volumes.
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> pvc
+ : mapOfOpts.entrySet()) {
+
+ // Only create claims for `OnDemand` volumes.
+ final String claimName = pvc.getValue().get(KubernetesConstants.VolumeConfigKeys.claimName);
+ if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+ continue;
+ }
+
+ listOfPVCs.add(Volumes.get()
+ .createPersistentVolumeClaim(pvc.getKey(),
+ getPersistentVolumeClaimLabels(clusterConfigs.getTopologyName()), pvc.getValue()));
+ }
+ return listOfPVCs;
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Persistent Volume Claims</code>s
+ * to be placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
+ * @param mapConfig Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsPersistentVolumeClaimCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapConfig,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
+ : mapConfig.entrySet()) {
+ final String volumeName = configs.getKey();
+
+ // Do not create Volumes for `OnDemand`.
+ final String claimName = configs.getValue()
+ .get(KubernetesConstants.VolumeConfigKeys.claimName);
+ if (claimName != null && !KubernetesConstants.LABEL_ON_DEMAND.equalsIgnoreCase(claimName)) {
+ volumes.add(Volumes.get().createPersistentVolumeClaim(claimName, volumeName));
+ }
+ volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>emptyDir</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsEmptyDirCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = Volumes.get()
+ .createVolume(Volumes.VolumeType.EmptyDir, volumeName, configs.getValue());
+ volumes.add(volume);
+ volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>Host Path</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsHostPathCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = Volumes.get()
+ .createVolume(Volumes.VolumeType.HostPath, volumeName, configs.getValue());
+ volumes.add(volume);
+ volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Generates the <code>Volume</code>s and <code>Volume Mounts</code> for <code>NFS</code>s to be
+ * placed in the <code>Executor</code> and <code>Manager</code> from options on the CLI.
+ * @param mapOfOpts Mapping of <code>Volume</code> option <code>key-value</code> configuration pairs.
+ * @param volumes A list of <code>Volume</code> to append to.
+ * @param volumeMounts A list of <code>Volume Mounts</code> to append to.
+ */
+ @VisibleForTesting
+ protected void createVolumeAndMountsNFSCLI(
+ final Map<String, Map<KubernetesConstants.VolumeConfigKeys, String>> mapOfOpts,
+ final List<V1Volume> volumes, final List<V1VolumeMount> volumeMounts) {
+ for (Map.Entry<String, Map<KubernetesConstants.VolumeConfigKeys, String>> configs
+ : mapOfOpts.entrySet()) {
+ final String volumeName = configs.getKey();
+ final V1Volume volume = Volumes.get()
+ .createVolume(Volumes.VolumeType.NetworkFileSystem, volumeName, configs.getValue());
+ volumes.add(volume);
+ volumeMounts.add(Volumes.get().createMount(volumeName, configs.getValue()));
+ }
+ }
+
+ /**
+ * Configures the Pod Spec and Heron container with <code>Volumes</code> and <code>Volume Mounts</code>.
+ * @param podSpec All generated <code>V1Volume</code> will be placed in the <code>Pod Spec</code>.
+ * @param executor All generated <code>V1VolumeMount</code> will be placed in the <code>Container</code>.
+ * @param volumes <code>Volumes</code> to be inserted in the Pod Spec.
+ * @param volumeMounts <code>Volumes Mounts</code> to be inserted in the Heron container.
+ */
+ @VisibleForTesting
+ protected void configurePodWithVolumesAndMountsFromCLI(final V1PodSpec podSpec,
+ final V1Container executor, List<V1Volume> volumes, List<V1VolumeMount> volumeMounts) {
+
+ // Deduplicate on Names with Persistent Volume Claims taking precedence.
+
+ KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
+ new KubernetesUtils.V1ControllerUtils<>();
+ podSpec.setVolumes(
+ utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
+ Comparator.comparing(V1Volume::getName),
+ "Pod with Volumes"));
+
+ KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
+ new KubernetesUtils.V1ControllerUtils<>();
+ executor.setVolumeMounts(
+ utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
+ Comparator.comparing(V1VolumeMount::getName),
+ "Heron container with Volume Mounts"));
+ }
+
+ /**
+ * Generates the <code>Label</code> which are attached to a Topology's Persistent Volume Claims.
+ * @param topologyName Attached to the topology match label.
+ * @return A map consisting of the <code>label-value</code> pairs to be used in <code>Label</code>s.
+ */
+ @VisibleForTesting
+ protected static Map<String, String> getPersistentVolumeClaimLabels(String topologyName) {
+ return new HashMap<String, String>() {
+ {
+ put(KubernetesConstants.LABEL_TOPOLOGY, topologyName);
+ put(KubernetesConstants.LABEL_ON_DEMAND, "true");
+ }
+ };
+ }
+
+ /**
+ * Generates the <code>StatefulSet</code> name depending on if it is a <code>Executor</code> or
+ * <code>Manager</code>.
+ * @param isExecutor Flag used to generate name for <code>Executor</code> or <code>Manager</code>.
+ * @return String <code>"topology-name"-executors</code>.
+ */
+ private String getStatefulSetName(boolean isExecutor) {
+ return String.format("%s-%s", clusterConfigs.getTopologyName(),
+ isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
+ }
+
+ /**
+ * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
+ * @return A label of the form <code>app=heron,topology=topology-name</code>.
+ */
+ private String createTopologySelectorLabels() {
+ return String.format("%s=%s,%s=%s",
+ KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
+ KubernetesConstants.LABEL_TOPOLOGY, clusterConfigs.getTopologyName());
+ }
}