You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/09/30 03:39:45 UTC

[GitHub] [spark] attilapiros opened a new pull request, #38056: [WIP][SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

attilapiros opened a new pull request, #38056:
URL: https://github.com/apache/spark/pull/38056

   
   ### What changes were proposed in this pull request?
   
   Fix a race condition in ExecutorMetricsPoller between  `getExecutorUpdates()` and `onTaskStart()` methods
   by avoiding removing entries when another stage is not started yet.
   
   ### Why are the changes needed?
   
   Spurious failures are reported because of the following assert:
   
   ```
   22/09/29 09:46:24 ERROR SparkUncaughtExceptionHandler: Uncaught exception in thread Thread[Executor task launch worker for task 3063.0 in stage 1997.0 (TID 677249),5,main]
   java.lang.AssertionError: assertion failed: task count shouldn't below 0
   	at scala.Predef$.assert(Predef.scala:223)
   	at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
   	at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
   	at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1822)
   	at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
   	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:737)
   	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
   	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
   	at java.base/java.lang.Thread.run(Thread.java:829)
   22/09/29 09:46:24 INFO MemoryStore: MemoryStore cleared
   22/09/29 09:46:24 INFO BlockManager: BlockManager stopped
   22/09/29 09:46:24 INFO ShutdownHookManager: Shutdown hook called
   22/09/29 09:46:24 INFO ShutdownHookManager: Deleting directory /mnt/yarn/usercache/hadoop/appcache/application_1664443624160_0001/spark-93efc2d4-84de-494b-a3b7-2cb1c3a45426
   ```
   
   I have checked the code and the basic assumption to have at least as many `onTaskStart()` calls as `onTaskCompletion()` for the same `stageId` & `stageAttemptId` pair is correct. But there is race condition between
   `getExecutorUpdates()` and `onTaskStart()`.
   
   For example let's assume a task just finished which was running on its own (no other tasks was running). 
   So this will decrease the `count` from 1 to 0.
   
   On the task runner thread let say a new task starts. So the execution is in the `onTaskStart()` method 
   let's assume we the `TCMP` entry ('countAndPeaks') is already got and here the counter is 0 
   but the execution is still before incrementing the counter. So we are in between the following two lines:
   
   ```
    val countAndPeaks = stageTCMP.computeIfAbsent((stageId, stageAttemptId),
         _ => TCMP(new AtomicLong(0), new AtomicLongArray(ExecutorMetricType.numMetrics)))
    val stageCount = countAndPeaks.count.incrementAndGet()
   ```
   
   Let's look at the other thread (heartbeater) where the `getExecutorUpdates()` is running and it is at the
   `removeIfInactive()` method:
   
   ```
      def removeIfInactive(k: StageKey, v: TCMP): TCMP = {
         if (v.count.get == 0) {
           logDebug(s"removing (${k._1}, ${k._2}) from stageTCMP")
           null
         } else {
           v
         }
       }
   ```
   
   And here this entry is removed from `stageTCMP` as the count is 0.
   
   Let's go back to the task runner thread where we increase the counter to 1 but that value will be lost 
   as we have no entry in the `stageTCMP` for this stage and attempt.
   
   So if a new task comes instead of 2 we will have 1 in the `stageTCMP` and when those two tasks finishes 
   the second one will decrease the counter from 0 to -1. This is when the assert raised.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Unit test.
   
   
   I managed to reproduce the issue with a temporary test:
   
   ```
    test("reproduce assert failure") {
       val testMemoryManager = new TestMemoryManager(new SparkConf())
       val taskId = new AtomicLong(0)
       val runFlag = new AtomicBoolean(true)
       val poller = new ExecutorMetricsPoller(testMemoryManager, 1000, None)
       val callUpdates = new Thread("getExecutorOpdates") {
         override def run() {
           while (runFlag.get())  { poller.getExecutorUpdates.size }
         }
       }
   
       val taskStartRunner1 = new Thread("taskRunner1") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
           }
         }
       }
       val taskStartRunner2 = new Thread("taskRunner2") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
           }
         }
       }
       val taskStartRunner3 = new Thread("taskRunner3") {
         override def run() {
           while (runFlag.get) {
             var l = taskId.incrementAndGet()
             var m = taskId.incrementAndGet()
             poller.onTaskStart(l, 0, 0)
             poller.onTaskStart(m, 0, 0)
             poller.onTaskCompletion(l, 0, 0)
             poller.onTaskCompletion(m, 0, 0)
           }
         }
       }
       callUpdates.start()
       taskStartRunner1.start()
       taskStartRunner2.start()
       taskStartRunner3.start()
      
       Thread.sleep(1000 * 10)
   
       runFlag.set(false)
       callUpdates.join()
       taskStartRunner1.join()
       taskStartRunner2.join()
       taskStartRunner3.join()
     }
   ```
   
   This printed the following when `org.apache.spark.executor.ExecutorMetricsPoller` is set to DEBUG level:
   
   ```
   taskRunner3: stageTCMP: (0, 0) -> 0
   taskRunner3: stageTCMP: (0, 0) -> 1
   getExecutorOpdates: removing (0, 0) from stageTCMP
   taskRunner3: stageTCMP: (0, 0) -> 1
   taskRunner3: stageTCMP: (0, 0) -> 0
   Exception in thread "taskRunner3" java.lang.AssertionError: assertion failed: task count shouldn't below 0
           at scala.Predef$.assert(Predef.scala:223)
           at org.apache.spark.executor.ExecutorMetricsPoller.decrementCount$1(ExecutorMetricsPoller.scala:130)
           at org.apache.spark.executor.ExecutorMetricsPoller.$anonfun$onTaskCompletion$3(ExecutorMetricsPoller.scala:135)
           at java.base/java.util.concurrent.ConcurrentHashMap.computeIfPresent(ConcurrentHashMap.java:1828)
           at org.apache.spark.executor.ExecutorMetricsPoller.onTaskCompletion(ExecutorMetricsPoller.scala:135)
           at org.apache.spark.executor.ExecutorMetricsPollerSuite$$anon$4.run(ExecutorMetricsPollerSuite.scala:64)
   ```
   
   Which proves my theory.
   
    
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wypoon commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
wypoon commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263898158

   Thanks @apiros for looking into this problem. Great analysis and nice coming up with a repro!
   However, in Spark, isn't it possible for more than one stage to be run at the same time? In that case, your proposed fix has the same issue as the original. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros closed pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros closed pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries
URL: https://github.com/apache/spark/pull/38056


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1265474768

   Merged to master, branch-3.3, branch-3.2.
   And backporting to branch-3.3 is in progress (https://github.com/apache/spark/pull/38083).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263903717

   @wypoon is right and I have to find another way to fix this!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263960220

   As this way the all the entry modifications are within a protected method of the `ConcurrentHashMap` so we can get rid of the `AtomicLong` too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263955083

   Why not something like this:
   
   ```
       stageTCMP.compute((stageId, stageAttemptId), (_, currentCountAndPeaks) =>
         if (null == currentCountAndPeaks) {
           TCMP(new AtomicLong(1), new AtomicLongArray(ExecutorMetricType.numMetrics))
         } else {
           currentCountAndPeaks.count.incrementAndGet()
           currentCountAndPeaks
         })
   ```
   
   in onTaskStart ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263532841

   cc @Ngone51 @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263959184

   @mridulm 
   
   That was I am about to do without the AtomicLong:
   ```scala
         // Put a new entry in stageTCMP for the stage if there isn't one already.
       // Increment the task count.
       val countAndPeaks = stageTCMP.compute((stageId, stageAttemptId),
         (k: StageKey, v: TCMP) => v match {
           case value if value == null => 
             TCMP(1L , new AtomicLongArray(ExecutorMetricType.numMetrics))
           case value =>
             TCMP(value.count + 1, value.peaks)
         })
       logDebug(s"stageTCMP: ($stageId, $stageAttemptId) -> ${countAndPeaks.count}")
   
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1263574923

   cc @mridulm too


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] attilapiros commented on pull request #38056: [SPARK-40617] Fix race condition at the handling of ExecutorMetricsPoller's stageTCMP entries

Posted by GitBox <gi...@apache.org>.
attilapiros commented on PR #38056:
URL: https://github.com/apache/spark/pull/38056#issuecomment-1264009589

   I thought about get rid of the `AtomicLongArray` but that might be lot new extra object creation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org