You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2022/08/05 16:41:32 UTC
[spark] branch master updated: [SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3df7124b6ae [SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy
3df7124b6ae is described below
commit 3df7124b6ae2bf6a18fd9593ebe9ab5fe0dfdd5b
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Fri Aug 5 09:41:13 2022 -0700
[SPARK-39987][K8S] Support `PEAK_JVM_(ON|OFF)HEAP_MEMORY` executor rolling policy
### What changes were proposed in this pull request?
This PR aims to support two new executor rolling policies.
- `PEAK_JVM_ONHEAP_MEMORY` policy chooses an executor with the biggest peak JVM on-heap memory.
- `PEAK_JVM_OFFHEAP_MEMORY` policy chooses an executor with the biggest peak JVM off-heap memory.
### Why are the changes needed?
Although peak memory is a kind of historic value, these two new policies add a capability to maintain the memory usage of Spark jobs minimally as much as possible.
### Does this PR introduce _any_ user-facing change?
Yes, but this is a new feature.
### How was this patch tested?
Pass the CIs.
Closes #37418 from dongjoon-hyun/SPARK-39987.
Authored-by: Dongjoon Hyun <do...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/deploy/k8s/Config.scala | 8 +-
.../scheduler/cluster/k8s/ExecutorRollPlugin.scala | 17 ++-
.../cluster/k8s/ExecutorRollPluginSuite.scala | 118 +++++++++++++++++++--
3 files changed, 128 insertions(+), 15 deletions(-)
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
index edbcbb1e88d..1f508b52729 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala
@@ -164,13 +164,14 @@ private[spark] object Config extends Logging {
object ExecutorRollPolicy extends Enumeration {
val ID, ADD_TIME, TOTAL_GC_TIME, TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS,
- OUTLIER, OUTLIER_NO_FALLBACK = Value
+ PEAK_JVM_ONHEAP_MEMORY, PEAK_JVM_OFFHEAP_MEMORY, OUTLIER, OUTLIER_NO_FALLBACK = Value
}
val EXECUTOR_ROLL_POLICY =
ConfigBuilder("spark.kubernetes.executor.rollPolicy")
.doc("Executor roll policy: Valid values are ID, ADD_TIME, TOTAL_GC_TIME, " +
- "TOTAL_DURATION, FAILED_TASKS, and OUTLIER (default). " +
+ "TOTAL_DURATION, AVERAGE_DURATION, FAILED_TASKS, PEAK_JVM_ONHEAP_MEMORY, " +
+ "PEAK_JVM_OFFHEAP_MEMORY, OUTLIER (default), and OUTLIER_NO_FALLBACK. " +
"When executor roll happens, Spark uses this policy to choose " +
"an executor and decommission it. The built-in policies are based on executor summary." +
"ID policy chooses an executor with the smallest executor ID. " +
@@ -179,6 +180,9 @@ private[spark] object Config extends Logging {
"TOTAL_DURATION policy chooses an executor with the biggest total task time. " +
"AVERAGE_DURATION policy chooses an executor with the biggest average task time. " +
"FAILED_TASKS policy chooses an executor with the most number of failed tasks. " +
+ "PEAK_JVM_ONHEAP_MEMORY policy chooses an executor with the biggest peak JVM on-heap " +
+ "memory. PEAK_JVM_OFFHEAP_MEMORY policy chooses an executor with the biggest peak JVM " +
+ "off-heap memory. " +
"OUTLIER policy chooses an executor with outstanding statistics which is bigger than" +
"at least two standard deviation from the mean in average task time, " +
"total task time, total task GC time, and the number of failed tasks if exists. " +
diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
index ac048d68adf..b53c9b69d15 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPlugin.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
import org.apache.spark.SparkContext
import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin}
import org.apache.spark.deploy.k8s.Config.{EXECUTOR_ROLL_INTERVAL, EXECUTOR_ROLL_POLICY, ExecutorRollPolicy, MINIMUM_TASKS_PER_EXECUTOR_BEFORE_ROLLING}
+import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config.DECOMMISSION_ENABLED
import org.apache.spark.scheduler.ExecutorDecommissionInfo
@@ -47,6 +48,8 @@ class ExecutorRollPlugin extends SparkPlugin {
}
class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
+ lazy val EMPTY_METRICS = new ExecutorMetrics(Array.emptyLongArray)
+
private var sparkContext: SparkContext = _
private val periodicService: ScheduledExecutorService =
@@ -99,6 +102,9 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
override def shutdown(): Unit = periodicService.shutdown()
+ private def getPeakMetrics(summary: v1.ExecutorSummary, name: String): Long =
+ summary.peakMemoryMetrics.getOrElse(EMPTY_METRICS).getMetricValue(name)
+
private def choose(list: Seq[v1.ExecutorSummary], policy: ExecutorRollPolicy.Value)
: Option[String] = {
val listWithoutDriver = list
@@ -118,6 +124,10 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
listWithoutDriver.sortBy(e => e.totalDuration.toFloat / Math.max(1, e.totalTasks)).reverse
case ExecutorRollPolicy.FAILED_TASKS =>
listWithoutDriver.sortBy(_.failedTasks).reverse
+ case ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY =>
+ listWithoutDriver.sortBy(getPeakMetrics(_, "JVMHeapMemory")).reverse
+ case ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY =>
+ listWithoutDriver.sortBy(getPeakMetrics(_, "JVMOffHeapMemory")).reverse
case ExecutorRollPolicy.OUTLIER =>
// If there is no outlier we fallback to TOTAL_DURATION policy.
outliersFromMultipleDimensions(listWithoutDriver) ++
@@ -131,14 +141,17 @@ class ExecutorRollDriverPlugin extends DriverPlugin with Logging {
/**
* We build multiple outlier lists and concat in the following importance order to find
* outliers in various perspective:
- * AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS
+ * AVERAGE_DURATION > TOTAL_DURATION > TOTAL_GC_TIME > FAILED_TASKS >
+ * PEAK_JVM_ONHEAP_MEMORY > PEAK_JVM_OFFHEAP_MEMORY
* Since we will choose only first item, the duplication is okay.
*/
private def outliersFromMultipleDimensions(listWithoutDriver: Seq[v1.ExecutorSummary]) =
outliers(listWithoutDriver.filter(_.totalTasks > 0), e => e.totalDuration / e.totalTasks) ++
outliers(listWithoutDriver, e => e.totalDuration) ++
outliers(listWithoutDriver, e => e.totalGCTime) ++
- outliers(listWithoutDriver, e => e.failedTasks)
+ outliers(listWithoutDriver, e => e.failedTasks) ++
+ outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMHeapMemory")) ++
+ outliers(listWithoutDriver, e => getPeakMetrics(e, "JVMOffHeapMemory"))
/**
* Return executors whose metrics is outstanding, '(value - mean) > 2-sigma'. This is
diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
index 35909d0d884..e019834677d 100644
--- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
+++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorRollPluginSuite.scala
@@ -22,6 +22,7 @@ import org.scalatest.PrivateMethodTester
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config.ExecutorRollPolicy
+import org.apache.spark.executor.ExecutorMetrics
import org.apache.spark.status.api.v1.ExecutorSummary
class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
@@ -30,12 +31,15 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
private val _choose = PrivateMethod[Option[String]](Symbol("choose"))
+ val metrics = Some(new ExecutorMetrics(
+ Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 1024L)))
+
val driverSummary = new ExecutorSummary("driver", "host:port", true, 1,
10, 10, 1, 1, 1,
0, 0, 1, 100,
1, 100, 100,
10, false, 20, new Date(1639300000000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
val execWithSmallestID = new ExecutorSummary("1", "host:port", true, 1,
@@ -43,7 +47,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300001000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The smallest addTime
@@ -52,7 +56,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300000000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The biggest totalGCTime
@@ -61,7 +65,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 1, 100,
40, 100, 100,
10, false, 20, new Date(1639300002000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The biggest totalDuration
@@ -70,7 +74,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 4, 400,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The biggest failedTasks
@@ -79,7 +83,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
5, 0, 1, 100,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The biggest average duration (= totalDuration / totalTask)
@@ -88,7 +92,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 2, 300,
20, 100, 100,
10, false, 20, new Date(1639300003000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// The executor with no tasks
@@ -97,7 +101,7 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
0, 0, 0, 0,
0, 0, 0,
0, false, 0, new Date(1639300001000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
// This is used to stabilize 'mean' and 'sd' in OUTLIER test cases.
@@ -106,7 +110,9 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1200L))),
+ Map(), Map(), 1,
false, Set())
val execWithTwoDigitID = new ExecutorSummary("10", "host:port", true, 1,
@@ -114,12 +120,31 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
4, 0, 2, 280,
30, 100, 100,
10, false, 20, new Date(1639300001000L),
- Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map(), 1,
+ Option.empty, Option.empty, Map(), Option.empty, Set(), metrics, Map(), Map(), 1,
false, Set())
+ val execWithBiggestPeakJVMOnHeapMemory = new ExecutorSummary("11", "host:port", true, 1,
+ 10, 10, 1, 1, 1,
+ 4, 0, 2, 280,
+ 30, 100, 100,
+ 10, false, 20, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1201L, "JVMOffHeapMemory" -> 1200L))),
+ Map(), Map(), 1, false, Set())
+
+ val execWithBiggestPeakJVMOffHeapMemory = new ExecutorSummary("12", "host:port", true, 1,
+ 10, 10, 1, 1, 1,
+ 4, 0, 2, 280,
+ 30, 100, 100,
+ 10, false, 20, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1200L, "JVMOffHeapMemory" -> 1201L))),
+ Map(), Map(), 1, false, Set())
+
val list = Seq(driverSummary, execWithSmallestID, execWithSmallestAddTime,
execWithBiggestTotalGCTime, execWithBiggestTotalDuration, execWithBiggestFailedTasks,
- execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID)
+ execWithBiggestAverageDuration, execWithoutTasks, execNormal, execWithTwoDigitID,
+ execWithBiggestPeakJVMOnHeapMemory, execWithBiggestPeakJVMOffHeapMemory)
override def beforeEach(): Unit = {
super.beforeEach()
@@ -179,6 +204,16 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.AVERAGE_DURATION)).contains("6"))
}
+ test("Policy: PEAK_JVM_ONHEAP_MEMORY") {
+ assert(plugin.invokePrivate(
+ _choose(list, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)).contains("11"))
+ }
+
+ test("Policy: PEAK_JVM_OFFHEAP_MEMORY") {
+ assert(plugin.invokePrivate(
+ _choose(list, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)).contains("12"))
+ }
+
test("Policy: OUTLIER - Work like TOTAL_DURATION if there is no outlier") {
assert(
plugin.invokePrivate(_choose(list, ExecutorRollPolicy.TOTAL_DURATION)) ==
@@ -224,6 +259,36 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
}
+ test("Policy: OUTLIER - Detect a peak JVM on-heap memory outlier") {
+ val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+ 0, 0, 1, 0, 0,
+ 3, 0, 1, 100,
+ 1000, 0, 0,
+ 0, false, 0, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
+ Map(), Map(), 1,
+ false, Set())
+ assert(
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
+ }
+
+ test("Policy: OUTLIER - Detect a peak JVM off-heap memory outlier") {
+ val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+ 0, 0, 1, 0, 0,
+ 3, 0, 1, 100,
+ 1000, 0, 0,
+ 0, false, 0, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
+ Map(), Map(), 1,
+ false, Set())
+ assert(
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER)))
+ }
+
test("Policy: OUTLIER_NO_FALLBACK - Return None if there are no outliers") {
assert(plugin.invokePrivate(_choose(list, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)).isEmpty)
}
@@ -266,4 +331,35 @@ class ExecutorRollPluginSuite extends SparkFunSuite with PrivateMethodTester {
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME)) ==
plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
}
+
+ test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM on-heap memory outlier") {
+ val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+ 0, 0, 1, 0, 0,
+ 3, 0, 1, 100,
+ 0, 0, 0,
+ 0, false, 0, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 2048L, "JVMOffHeapMemory" -> 1200L))),
+ Map(), Map(), 1,
+ false, Set())
+ val x = plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.TOTAL_GC_TIME))
+ assert(
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_ONHEAP_MEMORY)) ==
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
+ }
+
+ test("Policy: OUTLIER_NO_FALLBACK - Detect a peak JVM off-heap memory outlier") {
+ val outlier = new ExecutorSummary("9999", "host:port", true, 1,
+ 0, 0, 1, 0, 0,
+ 3, 0, 1, 100,
+ 0, 0, 0,
+ 0, false, 0, new Date(1639300001000L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(),
+ Some(new ExecutorMetrics(Map("JVMHeapMemory" -> 1024L, "JVMOffHeapMemory" -> 2048L))),
+ Map(), Map(), 1,
+ false, Set())
+ assert(
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.PEAK_JVM_OFFHEAP_MEMORY)) ==
+ plugin.invokePrivate(_choose(list :+ outlier, ExecutorRollPolicy.OUTLIER_NO_FALLBACK)))
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org