You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/30 03:39:45 UTC
[GitHub] [spark] attilapiros opened a new pull request, #38056: [WIP][SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
attilapiros opened a new pull request, #38056:
URL: https://github.com/apache/spark/pull/38056
### What changes were proposed in this pull request?
Fix a race condition in ExecutorMetricsPoller between `getExecutorUpdates()` and `onTaskStart()` methods
by avoiding removing entries when another stage is not started yet.
### Why are the changes needed?
Spurious failures are reported because of the following assert:
```
22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID 677249),5,main]
java.lang.AssertionError: assertion failed: task count shouldn't below 0
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared
22/09/29 09:46:24 INFO BlockManager: BlockManager stopped
22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called
22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426
```
I have checked the code and the basic assumption to have at least as many `onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` & `stageAttemptId` pair is correct. But there is race condition between
`getExecutorUpdates()` and `onTaskStart()`.
For example let's assume a task just finished which was running on its own (no other tasks was running).
So this will decrease the `count` from 1 to 0.
On the task runner thread let say a new task starts. So the execution is in the `onTaskStart()` method
let's assume we the `TCMP` entry ('countAndPeaks') is already got and here the counter is 0
but the execution is still before incrementing the counter. So we are in between the following two lines:
```
val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
_ => TCMP(new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics)))
val stageCount = countAndPeaks.count.incrementAndGet()
```
Let's look at the other thread (heartbeater) where the `getExecutorUpdates()` is running and it is at the
`removeIfInactive()` method:
```
def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
if (v.count.get == 0) {
logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
null
} else {
v
}
}
```
And here this entry is removed from `stageTCMP` as the count is 0.
Let's go back to the task runner thread where we increase the counter to 1 but that value will be lost
as we have no entry in the `stageTCMP` for this stage and attempt.
So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and when those two tasks finishes
the second one will decrease the counter from 0 to -1. This is when the assert raised.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test.
I managed to reproduce the issue with a temporary test:
```
test("reproduce assert failure") {
val testMemoryManager = new TestMemoryManager(new SparkConf())
val taskId = new AtomicLong(0)
val runFlag = new AtomicBoolean(true)
val poller = new ExecutorMetricsPoller(testMemoryManager, 1000, None)
val callUpdates = new Thread("getExecutorOpdates") {
override def run() {
while (runFlag.get()) { poller.getExecutorUpdates.size }
}
}
val taskStartRunner1 = new Thread("taskRunner1") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskCompletion(l, 0, 0)
}
}
}
val taskStartRunner2 = new Thread("taskRunner2") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskCompletion(l, 0, 0)
}
}
}
val taskStartRunner3 = new Thread("taskRunner3") {
override def run() {
while (runFlag.get) {
var l = taskId.incrementAndGet()
var m = taskId.incrementAndGet()
poller.onTaskStart(l, 0, 0)
poller.onTaskStart(m, 0, 0)
poller.onTaskCompletion(l, 0, 0)
poller.onTaskCompletion(m, 0, 0)
}
}
}
callUpdates.start()
taskStartRunner1.start()
taskStartRunner2.start()
taskStartRunner3.start()
Thread.sleep(1000 * 10)
runFlag.set(false)
callUpdates.join()
taskStartRunner1.join()
taskStartRunner2.join()
taskStartRunner3.join()
}
```
This printed the following when `org.apache.spark.executor.ExecutorMetricsPoller` is set to DEBUG level:
```
taskRunner3: stageTCMP: (0, 0) -> 0
taskRunner3: stageTCMP: (0, 0) -> 1
getExecutorOpdates: removing (0, 0) from stageTCMP
taskRunner3: stageTCMP: (0, 0) -> 1
taskRunner3: stageTCMP: (0, 0) -> 0
Exception in thread "taskRunner3" java.lang.AssertionError: assertion failed: task count shouldn't below 0
at scala.Predef$.assert(Predef.scala:223)
at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1828)
at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
at org.apache.spark.executor.ExecutorMetricsPollerSuite$$anon$4.run(ExecutorMetricsPollerSuite.scala:64)
```
Which proves my theory.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] wypoon commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
wypoon commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263898158
Thanks @apiros for looking into this problem. Great analysis and nice coming up with a repro!
However, in Spark, isn't it possible for more than one stage to be run at the same time? In that case, your proposed fix has the same issue as the original.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros closed pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros closed pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
URL: https://github.com/apache/spark/pull/38056
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1265474768
Merged to master, branch-3.3, branch-3.2.
And backporting to branch-3.3 is in progress (https://github.com/apache/spark/pull/38083).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263903717
@wypoon is right and I have to find another way to fix this!
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263960220
As this way the all the entry modifications are within a protected method of the `ConcurrentHashMap` so we can get rid of the `AtomicLong` too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] mridulm commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263955083
Why not something like this:
```
stageTCMP.compute((stageId, stageAttemptId), (_, currentCountAndPeaks) =>
if (null == currentCountAndPeaks) {
TCMP(new AtomicLong(1), new AtomicLongArray(ExecutorMetricType.numMetrics))
} else {
currentCountAndPeaks.count.incrementAndGet()
currentCountAndPeaks
})
```
in onTaskStart ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263532841
cc @Ngone51 @HyukjinKwon
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263959184
@mridulm
That was I am about to do without the AtomicLong:
```scala
// Put a new entry in stageTCMP for the stage if there isn't one already.
// Increment the task count.
val countAndPeaks = stageTCMP.compute((stageId, stageAttemptId),
(k: StageKey, v: TCMP) => v match {
case value if value == null =>
TCMP(1L , new AtomicLongArray(ExecutorMetricType.numMetrics))
case value =>
TCMP(value.count + 1, value.peaks)
})
logDebug(s"stageTCMP: ($stageId, $stageAttemptId) -> ${countAndPeaks.count}")
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] HyukjinKwon commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263574923
cc @mridulm too
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1264009589
I thought about get rid of the `AtomicLongArray` but that might be lot new extra object creation.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org