You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2023/02/13 04:15:05 UTC
[spark] branch master updated: [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block for test cases
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 89d1d17e57a [SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block for test cases
89d1d17e57a is described below
commit 89d1d17e57abc4c825cb9259f7a319b80e0d854a
Author: yangjie01 <ya...@baidu.com>
AuthorDate: Sun Feb 12 20:14:52 2023 -0800
[SPARK-42391][CORE][TESTS] Close live `AppStatusStore` in the finally block for test cases
### What changes were proposed in this pull request?
`AppStatusStore.createLiveStore` will return `RocksDB` backend `AppStatusStore` when `LIVE_UI_LOCAL_STORE_DIR` or `LIVE_UI_LOCAL_STORE_DIR` is configured, it should be closed in finally block to release resources for test cases.
There are 4 test suites use `AppStatusStore.createLiveStore` function:
- `AppStatusStoreSuite`: one case not close `AppStatusStore`
- `StagePageSuite`: already call close in `finally` block
- `AllExecutionsPageSuite`: already call close in `after`
- `SQLAppStatusListenerSuite`: already call close in `after`
and only `AppStatusStoreSuite` has `AppStatusStore` without manual closing, so this pr has made the following changes:
- For `SPARK-36038: speculation summary should not be present if there are no speculative tasks` in `AppStatusStoreSuite`, add close `AppStatusStore` in finally block
- For `SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)`, move the existing `AppStatusStore.close` to the finally block
### Why are the changes needed?
Call `AppStatusStore.close` in the finally block to release possible RocksDB resources.
### Does this PR introduce _any_ user-facing change?
No, just for test
### How was this patch tested?
- Pass GitHub Actions
- Manual test
```
export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui
build/sbt clean "core/testOnly org.apache.spark.status.AppStatusStoreSuite"
```
All tests passed
Closes #39961 from LuciferYang/SPARK-42391.
Authored-by: yangjie01 <ya...@baidu.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../apache/spark/status/AppStatusStoreSuite.scala | 173 +++++++++++----------
1 file changed, 90 insertions(+), 83 deletions(-)
diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index d38b0857e57..ccf6c9184cc 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -133,78 +133,81 @@ class AppStatusStoreSuite extends SparkFunSuite {
cases.foreach { case (hint, appStore) =>
test(s"SPARK-26260: summary should contain only successful tasks' metrics (store = $hint)") {
assume(appStore != null)
- val store = appStore.store
-
- // Success and failed tasks metrics
- for (i <- 0 to 5) {
- if (i % 2 == 0) {
- writeTaskDataToStore(i, store, "FAILED")
- } else {
- writeTaskDataToStore(i, store, "SUCCESS")
+ try {
+ val store = appStore.store
+
+ // Success and failed tasks metrics
+ for (i <- 0 to 5) {
+ if (i % 2 == 0) {
+ writeTaskDataToStore(i, store, "FAILED")
+ } else {
+ writeTaskDataToStore(i, store, "SUCCESS")
+ }
}
- }
- // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported)
- Seq(-1, 6).foreach { metric =>
- writeTaskDataToStore(metric, store, "RUNNING")
- }
+ // Running tasks metrics (-1 = no metrics reported, positive = metrics have been reported)
+ Seq(-1, 6).foreach { metric =>
+ writeTaskDataToStore(metric, store, "RUNNING")
+ }
- /**
- * Following are the tasks metrics,
- * 1, 3, 5 => Success
- * 0, 2, 4 => Failed
- * -1, 6 => Running
- *
- * Task summary will consider (1, 3, 5) only
- */
- val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
- val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), getTaskMetrics(5))
-
- def assertQuantiles(metricGetter: TaskMetrics => Double,
- actualQuantiles: Seq[Double]): Unit = {
- val values = successfulTasks.map(metricGetter)
- val expectedQuantiles = new Distribution(values, 0, values.length)
- .getQuantiles(uiQuantiles.sorted)
-
- assert(actualQuantiles === expectedQuantiles)
- }
+ /**
+ * Following are the tasks metrics,
+ * 1, 3, 5 => Success
+ * 0, 2, 4 => Failed
+ * -1, 6 => Running
+ *
+ * Task summary will consider (1, 3, 5) only
+ */
+ val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
+ val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), getTaskMetrics(5))
+
+ def assertQuantiles(metricGetter: TaskMetrics => Double,
+ actualQuantiles: Seq[Double]): Unit = {
+ val values = successfulTasks.map(metricGetter)
+ val expectedQuantiles = new Distribution(values, 0, values.length)
+ .getQuantiles(uiQuantiles.sorted)
+
+ assert(actualQuantiles === expectedQuantiles)
+ }
- assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime)
- assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime)
- assertQuantiles(_.executorRunTime, summary.executorRunTime)
- assertQuantiles(_.executorRunTime, summary.executorRunTime)
- assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
- assertQuantiles(_.resultSize, summary.resultSize)
- assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
- assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime)
- assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
- assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
- assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
- assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead)
- assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead)
- assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten)
- assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten)
- assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
- summary.shuffleReadMetrics.remoteBlocksFetched)
- assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
- summary.shuffleReadMetrics.localBlocksFetched)
- assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, summary.shuffleReadMetrics.fetchWaitTime)
- assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
- summary.shuffleReadMetrics.remoteBytesRead)
- assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
- summary.shuffleReadMetrics.remoteBytesReadToDisk)
- assertQuantiles(
- t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead,
- summary.shuffleReadMetrics.readBytes)
- assertQuantiles(
- t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched,
- summary.shuffleReadMetrics.totalBlocksFetched)
- assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes)
- assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime)
- assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
- summary.shuffleWriteMetrics.writeRecords)
-
- appStore.close()
+ assertQuantiles(_.executorDeserializeTime, summary.executorDeserializeTime)
+ assertQuantiles(_.executorDeserializeCpuTime, summary.executorDeserializeCpuTime)
+ assertQuantiles(_.executorRunTime, summary.executorRunTime)
+ assertQuantiles(_.executorRunTime, summary.executorRunTime)
+ assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
+ assertQuantiles(_.resultSize, summary.resultSize)
+ assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
+ assertQuantiles(_.resultSerializationTime, summary.resultSerializationTime)
+ assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
+ assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
+ assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
+ assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead)
+ assertQuantiles(_.inputMetrics.recordsRead, summary.inputMetrics.recordsRead)
+ assertQuantiles(_.outputMetrics.bytesWritten, summary.outputMetrics.bytesWritten)
+ assertQuantiles(_.outputMetrics.recordsWritten, summary.outputMetrics.recordsWritten)
+ assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
+ summary.shuffleReadMetrics.remoteBlocksFetched)
+ assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
+ summary.shuffleReadMetrics.localBlocksFetched)
+ assertQuantiles(_.shuffleReadMetrics.fetchWaitTime,
+ summary.shuffleReadMetrics.fetchWaitTime)
+ assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
+ summary.shuffleReadMetrics.remoteBytesRead)
+ assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
+ summary.shuffleReadMetrics.remoteBytesReadToDisk)
+ assertQuantiles(
+ t => t.shuffleReadMetrics.localBytesRead + t.shuffleReadMetrics.remoteBytesRead,
+ summary.shuffleReadMetrics.readBytes)
+ assertQuantiles(
+ t => t.shuffleReadMetrics.localBlocksFetched + t.shuffleReadMetrics.remoteBlocksFetched,
+ summary.shuffleReadMetrics.totalBlocksFetched)
+ assertQuantiles(_.shuffleWriteMetrics.bytesWritten, summary.shuffleWriteMetrics.writeBytes)
+ assertQuantiles(_.shuffleWriteMetrics.writeTime, summary.shuffleWriteMetrics.writeTime)
+ assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
+ summary.shuffleWriteMetrics.writeRecords)
+ } finally {
+ appStore.close()
+ }
}
}
@@ -230,22 +233,26 @@ class AppStatusStoreSuite extends SparkFunSuite {
val conf = new SparkConf(false).set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
val statusStore = AppStatusStore.createLiveStore(conf)
- val listener = statusStore.listener.get
-
- // Simulate a stage in job progress listener
- val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", numTasks = 1,
- rddInfos = Seq.empty, parentIds = Seq.empty, details = "details",
- resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
- (1 to 2).foreach {
- taskId =>
- val taskInfo = new TaskInfo(
- taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY,
- false)
- listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
- listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
- }
+ try {
+ val listener = statusStore.listener.get
+
+ // Simulate a stage in job progress listener
+ val stageInfo = new StageInfo(stageId = 0, attemptId = 0, name = "dummy", numTasks = 1,
+ rddInfos = Seq.empty, parentIds = Seq.empty, details = "details",
+ resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
+ (1 to 2).foreach {
+ taskId =>
+ val taskInfo = new TaskInfo(
+ taskId, taskId, 0, taskId, 0, "0", "localhost", TaskLocality.ANY,
+ false)
+ listener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo))
+ listener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo))
+ }
- assert(statusStore.speculationSummary(0, 0).isEmpty)
+ assert(statusStore.speculationSummary(0, 0).isEmpty)
+ } finally {
+ statusStore.close()
+ }
}
private def compareQuantiles(count: Int, quantiles: Array[Double]): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org