You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@heron.apache.org by sa...@apache.org on 2022/07/21 16:51:55 UTC

[incubator-heron] branch saadurrahman/3846-Refactoring-K8s-Shim-dev updated (8e67ad10ced -> 7d7efb0f491)

This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a change to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git


    from 8e67ad10ced [KubernetesShim] removing unneeded methods that were relocated to Stateful Set factory.
     new 8a6f2900bd4 [KubernetesShim] cleanup
     new 7d7efb0f491 [KubernetesUtils] renamed utility function from V1Controller to Common.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../heron/scheduler/kubernetes/KubernetesShim.java | 54 +++++++++-------------
 .../scheduler/kubernetes/KubernetesUtils.java      |  2 +-
 .../heron/scheduler/kubernetes/StatefulSet.java    | 17 +++----
 .../scheduler/kubernetes/KubernetesUtilsTest.java  | 18 ++++----
 4 files changed, 38 insertions(+), 53 deletions(-)


[incubator-heron] 01/02: [KubernetesShim] cleanup

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 8a6f2900bd437d90061fac80a1ff57a36866e8b6
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Thu Jul 21 12:50:17 2022 -0400

    [KubernetesShim] cleanup
    
    Removed unused imports. Switched to format string for some lengthy string messages.
---
 .../heron/scheduler/kubernetes/KubernetesShim.java | 54 +++++++++-------------
 1 file changed, 22 insertions(+), 32 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
index de1f4764fb6..f3541569986 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesShim.java
@@ -20,28 +20,18 @@
 package org.apache.heron.scheduler.kubernetes;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.logging.Level;
 import java.util.logging.Logger;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import com.google.common.annotations.VisibleForTesting;
 
-import org.apache.heron.api.utils.TopologyUtils;
 import org.apache.heron.common.basics.Pair;
 import org.apache.heron.scheduler.TopologyRuntimeManagementException;
 import org.apache.heron.scheduler.TopologySubmissionException;
-import org.apache.heron.scheduler.utils.Runtime;
-import org.apache.heron.scheduler.utils.SchedulerUtils;
-import org.apache.heron.scheduler.utils.SchedulerUtils.ExecutorPort;
 import org.apache.heron.spi.common.Config;
 import org.apache.heron.spi.packing.PackingPlan;
 import org.apache.heron.spi.packing.Resource;
@@ -53,9 +43,6 @@ import io.kubernetes.client.openapi.Configuration;
 import io.kubernetes.client.openapi.apis.AppsV1Api;
 import io.kubernetes.client.openapi.apis.CoreV1Api;
 import io.kubernetes.client.openapi.models.V1ConfigMap;
-import io.kubernetes.client.openapi.models.V1EnvVar;
-import io.kubernetes.client.openapi.models.V1EnvVarSource;
-import io.kubernetes.client.openapi.models.V1ObjectFieldSelector;
 import io.kubernetes.client.openapi.models.V1ObjectMeta;
 import io.kubernetes.client.openapi.models.V1PodTemplate;
 import io.kubernetes.client.openapi.models.V1PodTemplateSpec;
@@ -64,7 +51,6 @@ import io.kubernetes.client.openapi.models.V1ServiceSpec;
 import io.kubernetes.client.openapi.models.V1StatefulSet;
 import io.kubernetes.client.openapi.models.V1StatefulSetSpec;
 import io.kubernetes.client.openapi.models.V1Status;
-import io.kubernetes.client.openapi.models.V1Toleration;
 import io.kubernetes.client.util.PatchUtils;
 import io.kubernetes.client.util.Yaml;
 import okhttp3.Response;
@@ -76,8 +62,6 @@ public class KubernetesShim extends KubernetesController {
   private static final Logger LOG =
       Logger.getLogger(KubernetesShim.class.getName());
 
-  private static final String ENV_SHARD_ID = "SHARD_ID";
-
   private final boolean isPodTemplateDisabled;
 
   private final AppsV1Api appsClient;
@@ -306,8 +290,9 @@ public class KubernetesShim extends KubernetesController {
                   + getTopologyName());
           return;
         }
-        LOG.log(Level.SEVERE, "Error when deleting the Service of the job ["
-                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+        LOG.log(Level.SEVERE,
+            String.format("Error when deleting the Service of the job [%s] in namespace [%s]",
+            getTopologyName(), getNamespace()));
         LOG.log(Level.SEVERE, "Error killing topology message:" + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
@@ -320,14 +305,15 @@ public class KubernetesShim extends KubernetesController {
                 + getTopologyName());
         return;
       }
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes service", e);
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
     } catch (IOException e) {
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes service", e);
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes service", getTopologyName()), e);
     }
-    LOG.log(Level.INFO, "Headless Service for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
+    LOG.log(Level.INFO,
+        String.format("Headless Service for the Job [%s] in namespace [%s] is deleted.",
+        getTopologyName(), getNamespace()));
   }
 
   /**
@@ -346,8 +332,9 @@ public class KubernetesShim extends KubernetesController {
                   + getTopologyName());
           return;
         }
-        LOG.log(Level.SEVERE, "Error when deleting the StatefulSets of the job ["
-                + getTopologyName() + "] in namespace [" + getNamespace() + "]");
+        LOG.log(Level.SEVERE,
+            String.format("Error when deleting the StatefulSets of the job [%s] in namespace [%s]",
+            getTopologyName(), getNamespace()));
         LOG.log(Level.SEVERE, "Error killing topology message: " + response.message());
         KubernetesUtils.logResponseBodyIfPresent(LOG, response);
 
@@ -360,14 +347,17 @@ public class KubernetesShim extends KubernetesController {
                 + getTopologyName());
         return;
       }
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes StatefulSets", e);
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+        e);
     } catch (IOException e) {
-      throw new TopologyRuntimeManagementException("Error deleting topology ["
-              + getTopologyName() + "] Kubernetes StatefulSets", e);
+      throw new TopologyRuntimeManagementException(
+        String.format("Error deleting topology [%s] Kubernetes StatefulSets", getTopologyName()),
+        e);
     }
-    LOG.log(Level.INFO, "StatefulSet for the Job [" + getTopologyName()
-            + "] in namespace [" + getNamespace() + "] is deleted.");
+    LOG.log(Level.INFO,
+        String.format("StatefulSet for the Job [%s] in namespace [%s] is deleted.",
+          getTopologyName(), getNamespace()));
   }
 
   /**


[incubator-heron] 02/02: [KubernetesUtils] renamed utility function from V1Controller to Common.

Posted by sa...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

saadurrahman pushed a commit to branch saadurrahman/3846-Refactoring-K8s-Shim-dev
in repository https://gitbox.apache.org/repos/asf/incubator-heron.git

commit 7d7efb0f4911894ec93b5564f5943cd1d9130ad7
Author: Saad Ur Rahman <sa...@apache.org>
AuthorDate: Thu Jul 21 12:51:31 2022 -0400

    [KubernetesUtils] renamed utility function from V1Controller to Common.
---
 .../heron/scheduler/kubernetes/KubernetesUtils.java    |  2 +-
 .../apache/heron/scheduler/kubernetes/StatefulSet.java | 17 ++++++-----------
 .../scheduler/kubernetes/KubernetesUtilsTest.java      | 18 +++++++++---------
 3 files changed, 16 insertions(+), 21 deletions(-)

diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
index db6fd706d24..83891727110 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/KubernetesUtils.java
@@ -95,7 +95,7 @@ final class KubernetesUtils {
     return Math.round(value * scale) / scale;
   }
 
-  static class V1ControllerUtils<T> {
+  static class CommonUtils<T> {
     private static final Logger LOG = Logger.getLogger(KubernetesShim.class.getName());
 
     /**
diff --git a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
index 256885abb2b..b1f2fc990e2 100644
--- a/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
+++ b/heron/schedulers/src/java/org/apache/heron/scheduler/kubernetes/StatefulSet.java
@@ -434,8 +434,7 @@ final class StatefulSet {
    */
   @VisibleForTesting
   protected void configureTolerations(final V1PodSpec spec) {
-    KubernetesUtils.V1ControllerUtils<V1Toleration> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1Toleration> utils = new KubernetesUtils.CommonUtils<>();
     spec.setTolerations(
         utils.mergeListsDedupe(getTolerations(), spec.getTolerations(),
             Comparator.comparing(V1Toleration::getKey), "Pod Specification Tolerations")
@@ -624,7 +623,7 @@ final class StatefulSet {
   @VisibleForTesting
   protected void configureContainerEnvVars(final V1Container container) {
     // Deduplicate on var name with Heron defaults take precedence.
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> utils = new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1EnvVar> utils = new KubernetesUtils.CommonUtils<>();
     container.setEnv(
         utils.mergeListsDedupe(getExecutorEnvVars(), container.getEnv(),
           Comparator.comparing(V1EnvVar::getName), "Pod Template Environment Variables")
@@ -668,8 +667,7 @@ final class StatefulSet {
     }
 
     // Set container ports. Deduplicate using port number with Heron defaults taking precedence.
-    KubernetesUtils.V1ControllerUtils<V1ContainerPort> utils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1ContainerPort> utils = new KubernetesUtils.CommonUtils<>();
     container.setPorts(
         utils.mergeListsDedupe(getExecutorPorts(), container.getPorts(),
             Comparator.comparing(V1ContainerPort::getContainerPort), "Pod Template Ports")
@@ -725,8 +723,7 @@ final class StatefulSet {
               .mountPath(KubernetesContext.getContainerVolumeMountPath(config));
 
       // Merge volume mounts. Deduplicate using mount's name with Heron defaults taking precedence.
-      KubernetesUtils.V1ControllerUtils<V1VolumeMount> utils =
-          new KubernetesUtils.V1ControllerUtils<>();
+      KubernetesUtils.CommonUtils<V1VolumeMount> utils = new KubernetesUtils.CommonUtils<>();
       container.setVolumeMounts(
           utils.mergeListsDedupe(Collections.singletonList(mount), container.getVolumeMounts(),
               Comparator.comparing(V1VolumeMount::getName), "Pod Template Volume Mounts")
@@ -891,15 +888,13 @@ final class StatefulSet {
 
     // Deduplicate on Names with Persistent Volume Claims taking precedence.
 
-    KubernetesUtils.V1ControllerUtils<V1Volume> utilsVolumes =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1Volume> utilsVolumes = new KubernetesUtils.CommonUtils<>();
     podSpec.setVolumes(
         utilsVolumes.mergeListsDedupe(volumes, podSpec.getVolumes(),
             Comparator.comparing(V1Volume::getName),
             "Pod with Volumes"));
 
-    KubernetesUtils.V1ControllerUtils<V1VolumeMount> utilsMounts =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1VolumeMount> utilsMounts = new KubernetesUtils.CommonUtils<>();
     executor.setVolumeMounts(
         utilsMounts.mergeListsDedupe(volumeMounts, executor.getVolumeMounts(),
             Comparator.comparing(V1VolumeMount::getName),
diff --git a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
index 38a212497fb..4a80571c6d0 100644
--- a/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
+++ b/heron/schedulers/tests/java/org/apache/heron/scheduler/kubernetes/KubernetesUtilsTest.java
@@ -67,48 +67,48 @@ public class KubernetesUtilsTest {
         additionEnvVar
     );
 
-    KubernetesUtils.V1ControllerUtils<V1EnvVar> v1ControllerUtils =
-        new KubernetesUtils.V1ControllerUtils<>();
+    KubernetesUtils.CommonUtils<V1EnvVar> commonUtils =
+        new KubernetesUtils.CommonUtils<>();
 
     // Both input lists are null.
     Assert.assertNull("Both input lists are <null>",
-         v1ControllerUtils.mergeListsDedupe(null, null,
+         commonUtils.mergeListsDedupe(null, null,
              Comparator.comparing(V1EnvVar::getName), description));
 
     // <primaryList> is <null>.
     Assert.assertEquals("<primaryList> is null and <secondaryList> should be returned",
         inputEnvVars,
-        v1ControllerUtils.mergeListsDedupe(null, inputEnvVars,
+        commonUtils.mergeListsDedupe(null, inputEnvVars,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <primaryList> is empty.
     Assert.assertEquals("<primaryList> is empty and <secondaryList> should be returned",
         inputEnvVars,
-        v1ControllerUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
+        commonUtils.mergeListsDedupe(new LinkedList<>(), inputEnvVars,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <secondaryList> is <null>.
     Assert.assertEquals("<secondaryList> is null and <primaryList> should be returned",
         heronEnvVars,
-        v1ControllerUtils.mergeListsDedupe(heronEnvVars, null,
+        commonUtils.mergeListsDedupe(heronEnvVars, null,
             Comparator.comparing(V1EnvVar::getName), description));
 
     // <secondaryList> is empty.
     Assert.assertEquals("<secondaryList> is empty and <primaryList> should be returned",
         heronEnvVars,
-        v1ControllerUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
+        commonUtils.mergeListsDedupe(heronEnvVars, new LinkedList<>(),
             Comparator.comparing(V1EnvVar::getName), description));
 
     // Merge both lists.
     Assert.assertTrue("<primaryList> and <secondaryList> merged and deduplicated",
         expectedEnvVars.containsAll(
-            v1ControllerUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
+            commonUtils.mergeListsDedupe(heronEnvVars, inputEnvVars,
                 Comparator.comparing(V1EnvVar::getName), description)));
 
     // Expect thrown error.
     String errorMessage = "";
     try {
-      v1ControllerUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
+      commonUtils.mergeListsDedupe(heronEnvVars, Collections.singletonList(new V1EnvVar()),
           Comparator.comparing(V1EnvVar::getName), description);
     } catch (TopologySubmissionException e) {
       errorMessage = e.getMessage();