You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/08/05 16:41:32 UTC

[spark] branch master updated: [SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3df7124b6ae [SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy
3df7124b6ae is described below

commit 3df7124b6ae2bf6a18fd9593ebe9ab5fe0dfdd5b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Aug 5 09:41:13 2022 -0700

    [SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy
    
    ### What changes were proposed in this pull request?
    
    This PR aims to support two new executor rolling policies.
    - `PEAK_JVM_ONHEAP_MEMORY` policy chooses an executor with the biggest peak JVM on-heap memory.
    - `PEAK_JVM_OFFHEAP_MEMORY` policy chooses an executor with the biggest peak JVM off-heap memory.
    
    ### Why are the changes needed?
    
    Although peak memory is a kind of historic value, these two new policies add a capability to maintain the memory usage of Spark jobs minimally as much as possible.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, but this is a new feature.
    
    ### How was this patch tested?
    
    Pass the CIs.
    
    Closes #37418 from dongjoon-hyun/SPARK-39987.
    
    Authored-by: Dongjoon Hyun <do...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/deploy/k8s/Config.scala |   8 +-
 .../scheduler/cluster/k8s/ExecutorRollPlugin.scala |  17 ++-
 .../cluster/k8s/ExecutorRollPluginSuite.scala      | 118 +++++++++++++++++++--
 3 files changed, 128 insertions(+), 15 deletions(-)

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index edbcbb1e88d..1f508b52729 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -164,13 +164,14 @@ private[spark] object Config extends Logging {
 
   object ExecutorRollPolicy extends Enumeration {
     val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS,
-      OUTLIER, OUTLIER_NO_FALLBACK = Value
+      PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, OUTLIER, OUTLIER_NO_FALLBACK = Value
   }
 
   val EXECUTOR_ROLL_POLICY =
     ConfigBuilder("spark.kubernetes.executor.rollPolicy")
       .doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME, " +
-        "TOTAL_DURATION, FAILED_TASKS, and OUTLIER (default). " +
+        "TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, PEAK_JVM_ONHEAP_MEMORY, " +
+        "PEAK_JVM_OFFHEAP_MEMORY, OUTLIER (default), and OUTLIER_NO_FALLBACK. " +
         "When executor roll happens, Spark uses this policy to choose " +
         "an executor and decommission it. The built-in policies are based on executor summary." +
         "ID policy chooses an executor with the smallest executor ID. " +
@@ -179,6 +180,9 @@ private[spark] object Config extends Logging {
         "TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
         "AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
         "FAILED_TASKS policy chooses an executor with the most number of failed tasks. " +
+        "PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " +
+        "memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " +
+        "off-heap memory. " +
         "OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
         "at least two standard deviation from the mean in average task time, " +
         "total task time, total task GC time, and the number of failed tasks if exists. " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
index ac048d68adf..b53c9b69d15 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.SparkContext
 import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
 import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.DECOMMISSION_ENABLED
 import org.apache.spark.scheduler.ExecutorDecommissionInfo
@@ -47,6 +48,8 @@ class ExecutorRollPlugin extends SparkPlugin {
 }
 
 class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
+  lazy val EMPTY_METRICS = new ExecutorMetrics(Array.emptyLongArray)
+
   private var sparkContext: SparkContext = _
 
   private val periodicService: ScheduledExecutorService =
@@ -99,6 +102,9 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
 
   override def shutdown(): Unit = periodicService.shutdown()
 
+  private def getPeakMetrics(summary: v1.ExecutorSummary, name: String): Long =
+    summary.peakMemoryMetrics.getOrElse(EMPTY_METRICS).getMetricValue(name)
+
   private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
       : Option[String] = {
     val listWithoutDriver = list
@@ -118,6 +124,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
         listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
       case ExecutorRollPolicy.FAILED_TASKS =>
         listWithoutDriver.sortBy(_.failedTasks).reverse
+      case ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY =>
+        listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse
+      case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY =>
+        listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse
       case ExecutorRollPolicy.OUTLIER =>
         // If there is no outlier we fallback to TOTAL_DURATION policy.
         outliersFromMultipleDimensions(listWithoutDriver) ++
@@ -131,14 +141,17 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
   /**
    * We build multiple outlier lists and concat in the following importance order to find
    * outliers in various perspective:
-   *   AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS
+   *   AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS >
+   *     PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY
    * Since we will choose only first item, the duplication is okay.
    */
   private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) =
     outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration / e.totalTasks) ++
       outliers(listWithoutDriver, e => e.totalDuration) ++
       outliers(listWithoutDriver, e => e.totalGCTime) ++
-      outliers(listWithoutDriver, e => e.failedTasks)
+      outliers(listWithoutDriver, e => e.failedTasks) ++
+      outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
+      outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory"))
 
   /**
    * Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
index 35909d0d884..e019834677d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.PrivateMethodTester
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.deploy.k8s.Config.ExecutorRollPolicy
+import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.status.api.v1.ExecutorSummary
 
 class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
@@ -30,12 +31,15 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
 
   private val _choose = PrivateMethod[Option[String]](Symbol("choose"))
 
+  val metrics = Some(new ExecutorMetrics(
+    Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 1024L)))
+
   val driverSummary = new ExecutorSummary("driver", "host:port", true, 1,
     10, 10, 1, 1, 1,
     0, 0, 1, 100,
     1, 100, 100,
     10, false, 20, new Date(1639300000000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   val execWithSmallestID = new ExecutorSummary("1", "host:port", true, 1,
@@ -43,7 +47,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 1, 100,
     20, 100, 100,
     10, false, 20, new Date(1639300001000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The smallest addTime
@@ -52,7 +56,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 1, 100,
     20, 100, 100,
     10, false, 20, new Date(1639300000000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The biggest totalGCTime
@@ -61,7 +65,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 1, 100,
     40, 100, 100,
     10, false, 20, new Date(1639300002000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The biggest totalDuration
@@ -70,7 +74,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 4, 400,
     20, 100, 100,
     10, false, 20, new Date(1639300003000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The biggest failedTasks
@@ -79,7 +83,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     5, 0, 1, 100,
     20, 100, 100,
     10, false, 20, new Date(1639300003000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The biggest average duration (= totalDuration / totalTask)
@@ -88,7 +92,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 2, 300,
     20, 100, 100,
     10, false, 20, new Date(1639300003000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // The executor with no tasks
@@ -97,7 +101,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     0, 0, 0, 0,
     0, 0, 0,
     0, false, 0, new Date(1639300001000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
   // This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
@@ -106,7 +110,9 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     4, 0, 2, 280,
     30, 100, 100,
     10, false, 20, new Date(1639300001000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(),
+    Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1200L))),
+    Map(), Map(), 1,
     false, Set())
 
   val execWithTwoDigitID = new ExecutorSummary("10", "host:port", true, 1,
@@ -114,12 +120,31 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     4, 0, 2, 280,
     30, 100, 100,
     10, false, 20, new Date(1639300001000L),
-    Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+    Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
     false, Set())
 
+  val execWithBiggestPeakJVMOnHeapMemory = new ExecutorSummary("11", "host:port", true, 1,
+    10, 10, 1, 1, 1,
+    4, 0, 2, 280,
+    30, 100, 100,
+    10, false, 20, new Date(1639300001000L),
+    Option.empty, Option.empty, Map(), Option.empty, Set(),
+    Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1201L, "JVMOffHeapMemory" -> 1200L))),
+    Map(), Map(), 1, false, Set())
+
+  val execWithBiggestPeakJVMOffHeapMemory = new ExecutorSummary("12", "host:port", true, 1,
+    10, 10, 1, 1, 1,
+    4, 0, 2, 280,
+    30, 100, 100,
+    10, false, 20, new Date(1639300001000L),
+    Option.empty, Option.empty, Map(), Option.empty, Set(),
+    Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1201L))),
+    Map(), Map(), 1, false, Set())
+
   val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
     execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
-    execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID)
+    execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
+    execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory)
 
   override def beforeEach(): Unit = {
     super.beforeEach()
@@ -179,6 +204,16 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
     assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)).contains("6"))
   }
 
+  test("Policy: PEAK_JVM_ONHEAP_MEMORY") {
+    assert(plugin.invokePrivate(
+      _choose(list, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)).contains("11"))
+  }
+
+  test("Policy: PEAK_JVM_OFFHEAP_MEMORY") {
+    assert(plugin.invokePrivate(
+      _choose(list, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)).contains("12"))
+  }
+
   test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
     assert(
       plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) ==
@@ -224,6 +259,36 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
         plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
   }
 
+  test("Policy: OUTLIER - Detect a peak JVM on-heap memory outlier") {
+    val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+      0, 0, 1, 0, 0,
+      3, 0, 1, 100,
+      1000, 0, 0,
+      0, false, 0, new Date(1639300001000L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(),
+      Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
+      Map(), Map(), 1,
+      false, Set())
+    assert(
+      plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
+        plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
+  }
+
+  test("Policy: OUTLIER - Detect a peak JVM off-heap memory outlier") {
+    val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+      0, 0, 1, 0, 0,
+      3, 0, 1, 100,
+      1000, 0, 0,
+      0, false, 0, new Date(1639300001000L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(),
+      Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
+      Map(), Map(), 1,
+      false, Set())
+    assert(
+      plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
+        plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
+  }
+
   test("Policy: OUTLIER_NO_FALLBACK - Return None if there are no outliers") {
     assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)).isEmpty)
   }
@@ -266,4 +331,35 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
       plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME)) ==
         plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
   }
+
+  test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM on-heap memory outlier") {
+    val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+      0, 0, 1, 0, 0,
+      3, 0, 1, 100,
+      0, 0, 0,
+      0, false, 0, new Date(1639300001000L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(),
+      Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
+      Map(), Map(), 1,
+      false, Set())
+    val x = plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME))
+    assert(
+      plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
+        plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
+  }
+
+  test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM off-heap memory outlier") {
+    val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+      0, 0, 1, 0, 0,
+      3, 0, 1, 100,
+      0, 0, 0,
+      0, false, 0, new Date(1639300001000L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(),
+      Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
+      Map(), Map(), 1,
+      false, Set())
+    assert(
+      plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
+        plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org