You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/18 01:34:50 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r948573830


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   I guess we don't need to synchronize this access to `listenerInvocationThread` because all reads and `= Some()` assignments are synchronized?
   
   I see existing comments talking about the need to ensure that listeners are invoked sequentially. I'm wondering whether any of the changes in this PR might have broken the guarantees that prevented concurrent invocation from happening. There's one interleaving I'm worried about which I'll describe in a subsequent comment:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org