You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by mo...@apache.org on 2020/05/11 18:32:57 UTC

[zeppelin] branch master updated: [ZEPPELIN-4799] Use spark resource configuration

This is an automated email from the ASF dual-hosted git repository.

moon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b7df78  [ZEPPELIN-4799] Use spark resource configuration
3b7df78 is described below

commit 3b7df786cb067e234448c943994c3fa685693069
Author: Philipp Dallig <ph...@gmail.com>
AuthorDate: Thu May 7 09:12:20 2020 +0200

    [ZEPPELIN-4799] Use spark resource configuration
    
    ### What is this PR for?
    With this PR, we use spark configuration values for K8s Pod resources. A memory limit is not set because of a potential OOM-Killer.
    
    https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits
    > If you set a memory limit of 4GiB for that Container, the kubelet (and container runtime ) enforce the limit. The runtime prevents the container from using more than the configured resource limit. For example: when a process in the container tries to consume more than the allowed amount of memory, the system kernel terminates the process that attempted the allocation, with an out of memory (OOM) error.
    
    zjffdu Are using a YARN cluster to schedule your Interpreters? Maybe we should change the location of the calculation class.
    
    ### What type of PR is it?
     - Improvement
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-4799
    
    ### How should this be tested?
    * **Travis-CI**: https://travis-ci.org/github/Reamer/zeppelin/builds/683269227
    
    ### Questions:
    * **Maybe a higher default memory overhead?** Edit: No, because we doesn't need it in the past.
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Philipp Dallig <ph...@gmail.com>
    
    Closes #3761 from Reamer/spark_k8s_resources and squashes the following commits:
    
    64bd91294 [Philipp Dallig] Add short comment for limits.memory
    45e565fff [Philipp Dallig] Use Spark config values for K8s Interpreter Pod resources
    3218309b2 [Philipp Dallig] Some cleanup
---
 k8s/interpreter/100-interpreter-spec.yaml          |  9 +++
 .../launcher/K8sRemoteInterpreterProcess.java      | 33 +++++++--
 .../zeppelin/interpreter/launcher/K8sUtils.java    | 59 ++++++++++++++++
 .../launcher/K8sRemoteInterpreterProcessTest.java  | 78 +++++++++++++++++++++-
 .../interpreter/launcher/K8sUtilsTest.java         | 30 +++++++++
 5 files changed, 200 insertions(+), 9 deletions(-)

diff --git a/k8s/interpreter/100-interpreter-spec.yaml b/k8s/interpreter/100-interpreter-spec.yaml
index c331a99..76f1dea 100644
--- a/k8s/interpreter/100-interpreter-spec.yaml
+++ b/k8s/interpreter/100-interpreter-spec.yaml
@@ -54,6 +54,15 @@ spec:
     - name: {{key}}
       value: {{value}}
   {% endfor %}
+  {% if zeppelin.k8s.interpreter.cores is defined and zeppelin.k8s.interpreter.memory is defined %}
+    resources:
+      requests:
+        memory: "{{zeppelin.k8s.interpreter.memory}}"
+        cpu: "{{zeppelin.k8s.interpreter.cores}}"
+{# limits.memory is not set because of a potential OOM-Killer. https://kubernetes.io/docs/concepts/configuration/manage-resources-containers/#requests-and-limits #}
+      limits:
+        cpu: "{{zeppelin.k8s.interpreter.cores}}"
+  {% endif %}
   {% if zeppelin.k8s.interpreter.group.name == "spark" %}
     volumeMounts:
     - name: spark-home
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
index c167ae7..864f660 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcess.java
@@ -44,6 +44,12 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
   private String userName;
 
   private AtomicBoolean started = new AtomicBoolean(false);
+  private Random rand = new Random();
+
+  private static final String SPARK_DRIVER_MEMROY = "spark.driver.memory";
+  private static final String SPARK_DRIVER_MEMROY_OVERHEAD = "spark.driver.memoryOverhead";
+  private static final String SPARK_DRIVER_CORES = "spark.driver.cores";
+  private static final String ENV_SERVICE_DOMAIN = "SERVICE_DOMAIN";
 
   public K8sRemoteInterpreterProcess(
           Kubectl kubectl,
@@ -273,7 +279,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     }
 
     // environment variables
-    envs.put("SERVICE_DOMAIN", envs.getOrDefault("SERVICE_DOMAIN", System.getenv("SERVICE_DOMAIN")));
+    envs.put(ENV_SERVICE_DOMAIN, envs.getOrDefault(ENV_SERVICE_DOMAIN, System.getenv(ENV_SERVICE_DOMAIN)));
     envs.put("ZEPPELIN_HOME", envs.getOrDefault("ZEPPELIN_HOME", "/zeppelin"));
 
     if (isSpark()) {
@@ -294,8 +300,22 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
               webUrl,
               webUiPort,
               getPodName(),
-              envs.get("SERVICE_DOMAIN")
+              envs.get(ENV_SERVICE_DOMAIN)
           ));
+      // Resources of Interpreter Pod
+      if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
+        String memory;
+        if (properties.containsKey(SPARK_DRIVER_MEMROY_OVERHEAD)) {
+          memory = K8sUtils.calculateSparkMemory(properties.getProperty(SPARK_DRIVER_MEMROY),
+                                                 properties.getProperty(SPARK_DRIVER_MEMROY_OVERHEAD));
+        } else {
+          memory = K8sUtils.calculateMemoryWithDefaultOverhead(properties.getProperty(SPARK_DRIVER_MEMROY));
+        }
+        k8sProperties.put("zeppelin.k8s.interpreter.memory", memory);
+      }
+      if (properties.containsKey(SPARK_DRIVER_CORES)) {
+        k8sProperties.put("zeppelin.k8s.interpreter.cores", properties.getProperty(SPARK_DRIVER_CORES));
+      }
     }
 
     k8sProperties.put("zeppelin.k8s.envs", envs);
@@ -310,7 +330,7 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     ImmutableMap<String, Object> binding = ImmutableMap.of(
         "PORT", port,
         "SERVICE_NAME", serviceName,
-        "SERVICE_DOMAIN", serviceDomain
+        ENV_SERVICE_DOMAIN, serviceDomain
     );
 
     ClassLoader oldCl = Thread.currentThread().getContextClassLoader();
@@ -339,8 +359,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
 
     options.append(" --master k8s://https://kubernetes.default.svc");
     options.append(" --deploy-mode client");
-    if (properties.containsKey("spark.driver.memory")) {
-      options.append(" --driver-memory " + properties.get("spark.driver.memory"));
+    if (properties.containsKey(SPARK_DRIVER_MEMROY)) {
+      options.append(" --driver-memory " + properties.get(SPARK_DRIVER_MEMROY));
     }
     if (userName != null) {
       options.append(" --proxy-user " + userName);
@@ -398,9 +418,8 @@ public class K8sRemoteInterpreterProcess extends RemoteInterpreterProcess {
     char[] chars = "abcdefghijklmnopqrstuvwxyz".toCharArray();
 
     StringBuilder sb = new StringBuilder();
-    Random random = new Random();
     for (int i = 0; i < length; i++) {
-      char c = chars[random.nextInt(chars.length)];
+      char c = chars[rand.nextInt(chars.length)];
       sb.append(c);
     }
     return sb.toString();
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
new file mode 100644
index 0000000..7a62906
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/main/java/org/apache/zeppelin/interpreter/launcher/K8sUtils.java
@@ -0,0 +1,59 @@
+package org.apache.zeppelin.interpreter.launcher;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.StringUtils;
+
+public class K8sUtils {
+
+  private static final long K = 1024;
+  private static final long M = K * K;
+  private static final long G = M * K;
+  private static final long T = G * K;
+  private static final long MINIMUM_OVERHEAD = 384;
+
+  private K8sUtils() {
+    // do nothing
+  }
+
+  public static String calculateMemoryWithDefaultOverhead(String memory) {
+    long memoryMB = convertToBytes(memory) / M;
+    long memoryOverheadMB = Math.max((long) (memoryMB * 0.1f), MINIMUM_OVERHEAD);
+    return (memoryMB + memoryOverheadMB) + "Mi";
+  }
+
+  public static String calculateSparkMemory(String memory, String memoryOverhead) {
+    long memoryMB = convertToBytes(memory) / M;
+    long memoryOverheadMB = convertToBytes(memoryOverhead) / M;
+    return (memoryMB + memoryOverheadMB) + "Mi";
+  }
+
+  private static long convertToBytes(String memory) {
+    String lower = memory.toLowerCase().trim();
+    Matcher m = Pattern.compile("([0-9]+)([a-z]+)?").matcher(lower);
+    long value;
+    String suffix;
+    if (m.matches()) {
+      value = Long.parseLong(m.group(1));
+      suffix = m.group(2);
+    } else {
+      throw new NumberFormatException("Failed to parse string: " + memory);
+    }
+
+    long memoryAmountBytes = value;
+    if (StringUtils.containsIgnoreCase(suffix, "k")) {
+      memoryAmountBytes = value * K;
+    } else if (StringUtils.containsIgnoreCase(suffix, "m")) {
+      memoryAmountBytes = value * M;
+    } else if (StringUtils.containsIgnoreCase(suffix, "g")) {
+      memoryAmountBytes = value * G;
+    } else if (StringUtils.containsIgnoreCase(suffix, "t")) {
+      memoryAmountBytes = value * T;
+    }
+    if (0 > memoryAmountBytes) {
+      throw new NumberFormatException("Conversion of " + memory + " exceeds Long.MAX_VALUE");
+    }
+    return memoryAmountBytes;
+  }
+}
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
index 52c5621..7e1e668 100644
--- a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sRemoteInterpreterProcessTest.java
@@ -141,8 +141,8 @@ public class K8sRemoteInterpreterProcessTest {
     assertEquals("V1", envs.get("MY_ENV1"));
 
     envs = (HashMap<String, String>) p.get("zeppelin.k8s.envs");
-    assertEquals(true, envs.containsKey("SERVICE_DOMAIN"));
-    assertEquals(true, envs.containsKey("ZEPPELIN_HOME"));
+    assertTrue(envs.containsKey("SERVICE_DOMAIN"));
+    assertTrue(envs.containsKey("ZEPPELIN_HOME"));
   }
 
   @Test
@@ -337,4 +337,78 @@ public class K8sRemoteInterpreterProcessTest {
             "zeppelin-server",
             "my.domain.com"));
   }
+
+  @Test
+  public void testSparkPodResources() {
+    // given
+    Kubectl kubectl = mock(Kubectl.class);
+    when(kubectl.getNamespace()).thenReturn("default");
+
+    Properties properties = new Properties();
+    properties.put("spark.driver.memory", "1g");
+    properties.put("spark.driver.cores", "1");
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("SERVICE_DOMAIN", "mydomain");
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        kubectl,
+        new File(".skip"),
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.service",
+        "12320",
+        false,
+        "spark-container:1.0",
+        10,
+        false);
+
+    // when
+    Properties p = intp.getTemplateBindings();
+
+    // then
+    assertEquals("1", p.get("zeppelin.k8s.interpreter.cores"));
+    assertEquals("1408Mi", p.get("zeppelin.k8s.interpreter.memory"));
+  }
+
+  @Test
+  public void testSparkPodResourcesMemoryOverhead() {
+    // given
+    Kubectl kubectl = mock(Kubectl.class);
+    when(kubectl.getNamespace()).thenReturn("default");
+
+    Properties properties = new Properties();
+    properties.put("spark.driver.memory", "1g");
+    properties.put("spark.driver.memoryOverhead", "256m");
+    properties.put("spark.driver.cores", "5");
+    HashMap<String, String> envs = new HashMap<String, String>();
+    envs.put("SERVICE_DOMAIN", "mydomain");
+
+    K8sRemoteInterpreterProcess intp = new K8sRemoteInterpreterProcess(
+        kubectl,
+        new File(".skip"),
+        "interpreter-container:1.0",
+        "shared_process",
+        "spark",
+        "myspark",
+        properties,
+        envs,
+        "zeppelin.server.service",
+        "12320",
+        false,
+        "spark-container:1.0",
+        10,
+        false);
+
+    // when
+    Properties p = intp.getTemplateBindings();
+
+    // then
+    assertEquals("5", p.get("zeppelin.k8s.interpreter.cores"));
+    assertEquals("1280Mi", p.get("zeppelin.k8s.interpreter.memory"));
+  }
+
 }
diff --git a/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java
new file mode 100644
index 0000000..cab56ac
--- /dev/null
+++ b/zeppelin-plugins/launcher/k8s-standard/src/test/java/org/apache/zeppelin/interpreter/launcher/K8sUtilsTest.java
@@ -0,0 +1,30 @@
+package org.apache.zeppelin.interpreter.launcher;
+
+import static org.junit.Assert.*;
+
+import org.junit.Test;
+
+public class K8sUtilsTest {
+
+  @Test
+  public void testConvert() {
+    assertEquals("484Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100m"));
+    assertEquals("1408Mi", K8sUtils.calculateMemoryWithDefaultOverhead("1Gb"));
+    assertEquals("4505Mi", K8sUtils.calculateMemoryWithDefaultOverhead("4Gb"));
+    assertEquals("6758Mi", K8sUtils.calculateMemoryWithDefaultOverhead("6Gb"));
+    assertEquals("9011Mi", K8sUtils.calculateMemoryWithDefaultOverhead("8Gb"));
+    // some extrem values
+    assertEquals("112640Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Gb"));
+    assertEquals("115343360Mi", K8sUtils.calculateMemoryWithDefaultOverhead("100Tb"));
+  }
+
+  @Test(expected = NumberFormatException.class)
+  public void testExceptionMaxLong() {
+    K8sUtils.calculateMemoryWithDefaultOverhead("10000000Tb");
+  }
+
+  @Test(expected = NumberFormatException.class)
+  public void testExceptionNoValidNumber() {
+    K8sUtils.calculateMemoryWithDefaultOverhead("NoValidNumber10000000Tb");
+  }
+}