You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/20 18:20:59 UTC

[incubator-heron] branch saadurrahman/3846-Refactoring-K8s-Shim-dev updated: [StatefulSet] removed calls to cluster for Pod Templates.

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


The following commit(s) were added to refs/heads/saadurrahman/3846-Refactoring-K8s-Shim-dev by this push:
     new 8a1f15b35b8 [StatefulSet] removed calls to cluster for Pod Templates.
8a1f15b35b8 is described below

commit 8a1f15b35b81ebba9cccd9f44cf9e93d1ecd55f4
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Wed Jul 20 14:20:47 2022 -0400

    [StatefulSet] removed calls to cluster for Pod Templates.
    
    Removed cross-cutting calls to the K8s cluster from within the Stateful Set factory. All calls to the K8s cluster should remain in the shim with Pod Specs,  if found, provided in the configuration container.
---
 .../heron/scheduler/kubernetes/StatefulSet.java    | 211 ++-------------------
 1 file changed, 19 insertions(+), 192 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 3f23b4be321..c8abd842057 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -19,8 +19,6 @@
 
 package org.apache.heron.scheduler.kubernetes;
 
-
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,7 +35,6 @@ import java.util.stream.IntStream;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.heron.api.utils.TopologyUtils;
-import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
 import org.apache.heron.scheduler.utils.Runtime;
@@ -47,9 +44,6 @@ import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.packing.Resource;
 
 import io.kubernetes.client.custom.Quantity;
-import io.kubernetes.client.openapi.ApiException;
-import io.kubernetes.client.openapi.apis.CoreV1Api;
-import io.kubernetes.client.openapi.models.V1ConfigMap;
 import io.kubernetes.client.openapi.models.V1Container;
 import io.kubernetes.client.openapi.models.V1ContainerPort;
 import io.kubernetes.client.openapi.models.V1EnvVar;
@@ -59,19 +53,15 @@ import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PersistentVolumeClaim;
 import io.kubernetes.client.openapi.models.V1PodSpec;
-import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
 import io.kubernetes.client.openapi.models.V1ResourceRequirements;
 import io.kubernetes.client.openapi.models.V1SecretKeySelector;
 import io.kubernetes.client.openapi.models.V1SecretVolumeSourceBuilder;
-import io.kubernetes.client.openapi.models.V1Service;
-import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.openapi.models.V1Volume;
 import io.kubernetes.client.openapi.models.V1VolumeMount;
-import io.kubernetes.client.util.Yaml;
 
 final class StatefulSet {
   private final Map<Type, IStatefulSetFactory> statefulsets = new HashMap<>();
@@ -92,21 +82,19 @@ final class StatefulSet {
    * <code>KubernetesController</code> cannot be accessed externally since it is an abstract class.
    */
   static final class Configs {
-    private final CoreV1Api coreClient;
-    private final String namespace;
     private final String topologyName;
     private final Config configuration;
     private final Config runtimeConfiguration;
-    private final boolean podTemplateDisabled;
+    private final V1PodTemplateSpec managerPodTemplateSpec;
+    private final V1PodTemplateSpec executorPodTemplateSpec;
 
-    Configs(CoreV1Api coreClient, String namespace, Config configuration,
-            Config runtimeConfiguration) {
-      this.coreClient = coreClient;
-      this.namespace = namespace;
+    Configs(Config configuration, Config runtimeConfiguration,
+            V1PodTemplateSpec managerPodTemplateSpec, V1PodTemplateSpec executorPodTemplateSpec) {
       this.topologyName = Runtime.topologyName(runtimeConfiguration);
       this.configuration = configuration;
       this.runtimeConfiguration = runtimeConfiguration;
-      this.podTemplateDisabled = KubernetesContext.getPodTemplateDisabled(configuration);
+      this.managerPodTemplateSpec = managerPodTemplateSpec;
+      this.executorPodTemplateSpec = executorPodTemplateSpec;
     }
 
     Config getConfiguration() {
@@ -117,20 +105,16 @@ final class StatefulSet {
       return runtimeConfiguration;
     }
 
-    String getNamespace() {
-      return namespace;
-    }
-
     String getTopologyName() {
       return topologyName;
     }
 
-    boolean isPodTemplateDisabled() {
-      return podTemplateDisabled;
+    V1PodTemplateSpec getManagerPodTemplateSpec() {
+      return managerPodTemplateSpec;
     }
 
-    public CoreV1Api getCoreClient() {
-      return coreClient;
+    V1PodTemplateSpec getExecutorPodTemplateSpec() {
+      return executorPodTemplateSpec;
     }
   }
 
@@ -166,7 +150,7 @@ final class StatefulSet {
     public V1StatefulSet create(Configs configs, Resource containerResources,
                                 int numberOfInstances) {
       clusterConfigs = configs;
-      return null;
+      return createStatefulSet(containerResources, numberOfInstances, true);
     }
   }
 
@@ -176,7 +160,7 @@ final class StatefulSet {
     public V1StatefulSet create(Configs configs, Resource containerResources,
                                 int numberOfInstances) {
       clusterConfigs = configs;
-      return null;
+      return createStatefulSet(containerResources, numberOfInstances, false);
     }
   }
 
@@ -235,32 +219,6 @@ final class StatefulSet {
     return String.format(pattern, ENV_SHARD_ID, ENV_SHARD_ID);
   }
 
-  /**
-   * Creates a headless <code>Service</code> to facilitate communication between Pods in a <code>topology</code>.
-   * @return A fully configured <code>Service</code> to be used by a <code>topology</code>.
-   */
-  private V1Service createTopologyService() {
-    final String topologyName = clusterConfigs.getTopologyName();
-
-    final V1Service service = new V1Service();
-
-    // Setup service metadata.
-    final V1ObjectMeta objectMeta = new V1ObjectMeta()
-        .name(topologyName)
-        .annotations(getServiceAnnotations())
-        .labels(getServiceLabels());
-    service.setMetadata(objectMeta);
-
-    // Create the headless service.
-    final V1ServiceSpec serviceSpec = new V1ServiceSpec()
-        .clusterIP("None")
-        .selector(getPodMatchLabels(topologyName));
-
-    service.setSpec(serviceSpec);
-
-    return service;
-  }
-
   /**
    * Creates and configures the <code>StatefulSet</code> which the topology's <code>executor</code>s will run in.
    * @param containerResource Passed down to configure the <code>executor</code> resource limits.
@@ -320,7 +278,9 @@ final class StatefulSet {
     statefulSetSpec.setSelector(selector);
 
     // Create a Pod Template.
-    final V1PodTemplateSpec podTemplateSpec = loadPodFromTemplate(isExecutor);
+    final V1PodTemplateSpec podTemplateSpec =
+        isExecutor ? clusterConfigs.getExecutorPodTemplateSpec()
+        : clusterConfigs.getManagerPodTemplateSpec();
 
     // Set up Pod Metadata.
     final V1ObjectMeta templateMetaData = new V1ObjectMeta().labels(getPodLabels(topologyName));
@@ -350,14 +310,6 @@ final class StatefulSet {
     return KubernetesContext.getPodAnnotations(clusterConfigs.getConfiguration());
   }
 
-  /**
-   * Extracts <code>Service Annotations</code> for configurations.
-   * @return Key-value pairs of service <code>Annotation</code>s to be added to the Pod.
-   */
-  private Map<String, String> getServiceAnnotations() {
-    return KubernetesContext.getServiceAnnotations(clusterConfigs.getConfiguration());
-  }
-
   /**
    * Generates <code>Label</code>s to indicate Prometheus scraping and the exposed port.
    * @return Key-value pairs of Prometheus <code>Annotation</code>s to be added to the Pod.
@@ -397,14 +349,6 @@ final class StatefulSet {
     return labels;
   }
 
-  /**
-   * Extracts <code>Selector Labels</code> for<code>Service</code>s from configurations.
-   * @return Key-value pairs of <code>Service Labels</code> to be added to the Pod.
-   */
-  private Map<String, String> getServiceLabels() {
-    return KubernetesContext.getServiceLabels(clusterConfigs.getConfiguration());
-  }
-
   /**
    * Configures the <code>Pod Spec</code> section of the <code>StatefulSet</code>. The <code>Heron</code> container
    * will be configured to allow it to function but other supplied containers are loaded verbatim.
@@ -783,10 +727,10 @@ final class StatefulSet {
     for (Map.Entry<String, String> secret : podSecretKeyRefs.entrySet()) {
       final String[] keyRefParts = secret.getValue().split(":");
       if (keyRefParts.length != 2) {
-        LOG.log(Level.SEVERE,
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
-        throw new TopologyRuntimeManagementException(
-                "SecretKeyRef must be in the form name:key. <" + secret.getValue() + ">");
+        final String msg =
+            String.format("SecretKeyRef must be in the form name:key. <%s>", secret.getValue());
+        LOG.log(Level.SEVERE, msg);
+        throw new TopologyRuntimeManagementException(msg);
       }
       String name = keyRefParts[0];
       String key = keyRefParts[1];
@@ -800,113 +744,6 @@ final class StatefulSet {
     }
   }
 
-  /**
-   * Initiates the process of locating and loading <code>Pod Template</code> from a <code>ConfigMap</code>.
-   * The loaded text is then parsed into a usable <code>Pod Template</code>.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A <code>Pod Template</code> which is loaded and parsed from a <code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1PodTemplateSpec loadPodFromTemplate(boolean isExecutor) {
-    final Pair<String, String> podTemplateConfigMapName = getPodTemplateLocation(isExecutor);
-
-    // Default Pod Template.
-    if (podTemplateConfigMapName == null) {
-      LOG.log(Level.INFO, "Configuring cluster with the Default Pod Template");
-      return new V1PodTemplateSpec();
-    }
-
-    if (clusterConfigs.isPodTemplateDisabled()) {
-      throw new TopologySubmissionException("Custom Pod Templates are disabled");
-    }
-
-    final String configMapName = podTemplateConfigMapName.first;
-    final String podTemplateName = podTemplateConfigMapName.second;
-
-    // Attempt to locate ConfigMap with provided Pod Template name.
-    try {
-      V1ConfigMap configMap = getConfigMap(configMapName);
-      if (configMap == null) {
-        throw new ApiException(
-            String.format("K8s client unable to locate ConfigMap '%s'", configMapName));
-      }
-
-      final Map<String, String> configMapData = configMap.getData();
-      if (configMapData != null && configMapData.containsKey(podTemplateName)) {
-        // NullPointerException when Pod Template is empty.
-        V1PodTemplateSpec podTemplate = ((V1PodTemplate)
-            Yaml.load(configMapData.get(podTemplateName))).getTemplate();
-        LOG.log(Level.INFO, String.format("Configuring cluster with the %s.%s Pod Template",
-            configMapName, podTemplateName));
-        return podTemplate;
-      }
-
-      // Failure to locate Pod Template with provided name.
-      throw new ApiException(String.format("Failed to locate Pod Template '%s' in ConfigMap '%s'",
-          podTemplateName, configMapName));
-    } catch (ApiException e) {
-      KubernetesUtils.logExceptionWithDetails(LOG, e.getMessage(), e);
-      throw new TopologySubmissionException(e.getMessage());
-    } catch (IOException | ClassCastException | NullPointerException e) {
-      final String message = String.format("Error parsing Pod Template '%s' in ConfigMap '%s'",
-          podTemplateName, configMapName);
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Extracts the <code>ConfigMap</code> and <code>Pod Template</code> names from the CLI parameter.
-   * @param isExecutor Flag to indicate loading of <code>Pod Template</code> for <code>Executor</code>
-   *                   or <code>Manager</code>.
-   * @return A pair of the form <code>(ConfigMap, Pod Template)</code>.
-   */
-  @VisibleForTesting
-  protected Pair<String, String> getPodTemplateLocation(boolean isExecutor) {
-    final String podTemplateConfigMapName = KubernetesContext
-        .getPodTemplateConfigMapName(clusterConfigs.getConfiguration(), isExecutor);
-
-    if (podTemplateConfigMapName == null) {
-      return null;
-    }
-
-    try {
-      final int splitPoint = podTemplateConfigMapName.indexOf(".");
-      final String configMapName = podTemplateConfigMapName.substring(0, splitPoint);
-      final String podTemplateName = podTemplateConfigMapName.substring(splitPoint + 1);
-
-      if (configMapName.isEmpty() || podTemplateName.isEmpty()) {
-        throw new IllegalArgumentException("Empty ConfigMap or Pod Template name");
-      }
-
-      return new Pair<>(configMapName, podTemplateName);
-    } catch (IndexOutOfBoundsException | IllegalArgumentException e) {
-      final String message = "Invalid ConfigMap and/or Pod Template name";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(message);
-    }
-  }
-
-  /**
-   * Retrieves a <code>ConfigMap</code> from the K8s cluster in the API Server's namespace.
-   * @param configMapName Name of the <code>ConfigMap</code> to retrieve.
-   * @return The retrieved <code>ConfigMap</code>.
-   */
-  @VisibleForTesting
-  protected V1ConfigMap getConfigMap(String configMapName) {
-    try {
-      return clusterConfigs.getCoreClient().readNamespacedConfigMap(
-          configMapName,
-          clusterConfigs.getNamespace(),
-          null);
-    } catch (ApiException e) {
-      final String message = "Error retrieving ConfigMaps";
-      KubernetesUtils.logExceptionWithDetails(LOG, message, e);
-      throw new TopologySubmissionException(String.format("%s: %s", message, e.getMessage()));
-    }
-  }
-
   /**
    * Generates <code>Persistent Volume Claims Templates</code> from a mapping of <code>Volumes</code>
    * to <code>key-value</code> pairs of configuration options and values.
@@ -1077,14 +914,4 @@ final class StatefulSet {
     return String.format("%s-%s", clusterConfigs.getTopologyName(),
         isExecutor ? KubernetesConstants.EXECUTOR_NAME : KubernetesConstants.MANAGER_NAME);
   }
-
-  /**
-   * Generates the <code>Selector</code> match labels with which resources in this topology can be found.
-   * @return A label of the form <code>app=heron,topology=topology-name</code>.
-   */
-  private String createTopologySelectorLabels() {
-    return String.format("%s=%s,%s=%s",
-        KubernetesConstants.LABEL_APP, KubernetesConstants.LABEL_APP_VALUE,
-        KubernetesConstants.LABEL_TOPOLOGY, clusterConfigs.getTopologyName());
-  }
 }