You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/16 03:44:43 UTC

[GitHub] [spark] ryan-johnson-databricks opened a new pull request, #37531: [WIP][SPARK-XXXX] Task failure handlers should always run if the task failed

ryan-johnson-databricks opened a new pull request, #37531:
URL: https://github.com/apache/spark/pull/37531

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   
   Clean up the semantics of spark task listeners. Today, if a task body succeeds, but a completion listener fails, failure listeners are not called -- even tho the task has indeed failed at that point. The fix is to invoke failure listeners if a completion listener fails, before running the remaining completion listeners.
   
   ### Why are the changes needed?
   
   Failure listeners are not reliably called today, if the task failure is caused by a failed completion listener. This limits the utility of task listeners, especially ones that could assist with task cleanup.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No changes to public methods, but failure listeners will now run when a completion listener fails, where previously they did not.
   
   ### How was this patch tested?
   
   New unit tests exercise various combinations of failed listeners, with a task body that did (or did not) throw.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37531:
URL: https://github.com/apache/spark/pull/37531#issuecomment-1218753151

   +CC @ankurdave 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #37531: [WIP][SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #37531:
URL: https://github.com/apache/spark/pull/37531#issuecomment-1217492351

   looks good, what's still WIP?
   
   cc @Ngone51 @JoshRosen 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, tasks are pushed and popped from the stack in `synchronized` blocks, so every task will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible that failure listeners and completion listeners run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen closed pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen closed pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners
URL: https://github.com/apache/spark/pull/37531


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r948579391


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   What if thread A starts invoking listeners, hits an exception, de-registers the listenerInvocationThread, followed by Thread B registering a new completion or failure listener which, since the task is completed or failed, causes the listener-registering thread to try to immediately invoke all listeners. Could that lead to already-registered listeners being invoked twice? We wouldn't have _concurrent_ invocation but we'd still get a sequential interleaving of duplicate listener invocations.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://src.dev.databricks.com/databricks/runtime/-/blob/core/src/main/scala/org/apache/spark/TaskContext.scala?L138) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, tasks are pushed and popped from the stack in `synchronized` blocks, so every task will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible for failure listeners and completion listeners to run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949353331


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   Thanks. I'd overlooked the synchronization of stack operations and I agree that this guarantees only-once invocation of the listeners.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r948573830


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   I guess we don't need to synchronize this access to `listenerInvocationThread` because all reads and `= Some()` assignments are synchronized?
   
   I see existing comments talking about the need to ensure that listeners are invoked sequentially. I'm wondering whether any of the changes in this PR might have broken the guarantees that prevented concurrent invocation from happening. There's one interleaving I'm worried about which I'll describe in a subsequent comment:



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949739935


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body

Review Comment:
   No biggie but we could use backticks instead of `[[..]]` that is for scaladoc (whereas here's a comment)



##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -133,21 +134,53 @@ abstract class TaskContext extends Serializable {
   }
 
   /**
-   * Adds a listener to be executed on task failure. Adding a listener to an already failed task
-   * will result in that listener being called immediately.
+   * Adds a listener to be executed on task failure (which includes completion listener failure, if
+   * the task body did not already fail). Adding a listener to an already failed task will result in
+   * that listener being called immediately.
+   *
+   * Note: Prior to Spark 3.4.0, failure listeners were only invoked if the main task body failed.

Review Comment:
   nit: we have `@note` for a note but no biggie



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, listeners are pushed and popped from the stack in `synchronized` blocks, so every listener will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible that failure listeners and completion listeners run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes this PR should cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all -- hence this PR).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, listeners are pushed and popped from the stack in `synchronized` blocks, so every listener will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible that failure listeners and completion listeners run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only behavior changes this PR should cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all -- hence this PR).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://src.dev.databricks.com/databricks/runtime/-/blob/core/src/main/scala/org/apache/spark/TaskContext.scala?L138) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, tasks are pushed and popped from the stack in `synchronized` blocks, and only an invocation that finds the `listenerInvocationThread` empty can continue, so every task will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method.
   
   Fourth, it's already possible for failure listeners and completion listeners to run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, listeners are pushed and popped from the stack in `synchronized` blocks, so every listener will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible that failure listeners and completion listeners run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all -- hence this PR).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, listeners are pushed and popped from the stack in `synchronized` blocks, so every listener will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible that failure listeners and completion listeners run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #37531: [WIP][SPARK-XXXX] Task failure handlers should always run if the task failed

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #37531:
URL: https://github.com/apache/spark/pull/37531#issuecomment-1216314868

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on PR #37531:
URL: https://github.com/apache/spark/pull/37531#issuecomment-1220052170

   I ran the tests by pushing the latest commit (plus a merge with master) to the https://github.com/JoshRosen/spark/tree/task-failure-listeners-trigger-tests-2 branch in my fork: tests are all green there, so I'm going to merge this PR to master (for inclusion in Spark 3.4.0). Thanks everyone!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [WIP][SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r947422496


##########
core/src/main/scala/org/apache/spark/scheduler/Task.scala:
##########
@@ -136,44 +136,27 @@ private[spark] abstract class Task[T](
     plugins.foreach(_.onTaskStart())
 
     try {
-      runTask(context)
-    } catch {
-      case e: Throwable =>
-        // Catch all errors; run task failure callbacks, and rethrow the exception.
-        try {
-          context.markTaskFailed(e)
-        } catch {
-          case t: Throwable =>
-            e.addSuppressed(t)
-        }
-        context.markTaskCompleted(Some(e))
-        throw e
+      context.runTaskWithListeners(this)
     } finally {
       try {
-        // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
-        // one is no-op.
-        context.markTaskCompleted(None)
-      } finally {
-        try {
-          Utils.tryLogNonFatalError {
-            // Release memory used by this thread for unrolling blocks
-            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
-              MemoryMode.OFF_HEAP)
-            // Notify any tasks waiting for execution memory to be freed to wake up and try to
-            // acquire memory again. This makes impossible the scenario where a task sleeps forever
-            // because there are no other tasks left to notify it. Since this is safe to do but may
-            // not be strictly necessary, we should revisit whether we can remove this in the
-            // future.
-            val memoryManager = SparkEnv.get.memoryManager
-            memoryManager.synchronized { memoryManager.notifyAll() }
-          }
-        } finally {
-          // Though we unset the ThreadLocal here, the context member variable itself is still
-          // queried directly in the TaskRunner to check for FetchFailedExceptions.
-          TaskContext.unset()
-          InputFileBlockHolder.unset()
+        Utils.tryLogNonFatalError {

Review Comment:
   Indentation-only change, best viewed with whitespace ignored:
   https://github.com/apache/spark/pull/37531/files?diff=split&w=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949063130


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   This situation is already handled by the existing code:
   
   First, it is [specifically documented](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/TaskContext.scala#L108) that 
   
   > There are no ordering guarantees for listeners registered in different threads, or for listeners registered after the task completes
   
   Second, tasks are pushed and popped from the stack in `synchronized` blocks, so every task will be executed exactly once.
   
   Third, registering a new listener after the task completes will trigger a call to `invokeListeners`, which is a no-op if some other thread is already in that method -- only an invocation that finds an empty `listenerInvocationThread` can continue.
   
   Fourth, it's already possible for failure listeners and completion listeners to run on different threads, because the `listenerInvocationThread` is cleared between calls.
   
   The only changes my code can cause are:
   1. Failure listeners can be invoked in the middle of processing completion listeners, if one of the latter throws (where previously they were not invoked at all).
   2. ... in which case different threads might execute the initial completion listeners vs. the remaining completion listeners. But only if other threads are trying to register listeners at the same time.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] JoshRosen commented on a diff in pull request #37531: [SPARK-40106] Task failure should always trigger task failure listeners

Posted by GitBox <gi...@apache.org>.
JoshRosen commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r949338426


##########
core/src/main/scala/org/apache/spark/TaskContextImpl.scala:
##########
@@ -191,20 +191,69 @@ private[spark] class TaskContextImpl(
       }
     }
 
-    val errorMsgs = new ArrayBuffer[String](2)
+    val listenerExceptions = new ArrayBuffer[Throwable](2)
     var listenerOption: Option[T] = None
     while ({listenerOption = getNextListenerOrDeregisterThread(); listenerOption.nonEmpty}) {
       val listener = listenerOption.get
       try {
         callback(listener)
       } catch {
         case e: Throwable =>
-          errorMsgs += e.getMessage
+          // A listener failed. Temporarily clear the listenerInvocationThread and markTaskFailed.
+          //
+          // One of the following cases applies (#3 being the interesting one):
+          //
+          // 1. [[Task.doRunTask]] is currently calling [[markTaskFailed]] because the task body
+          //    failed, and now a failure listener has failed here (not necessarily the first to
+          //    fail). Then calling [[markTaskFailed]] again here is a no-op, and we simply resume
+          //    running the remaining failure listeners. [[Task.doRunTask]] will then call
+          //    [[markTaskCompleted]] after this method returns.
+          //
+          // 2. The task body failed, [[Task.doRunTask]] already called [[markTaskFailed]],
+          //    [[Task.doRunTask]] is currently calling [[markTaskCompleted]], and now a completion
+          //    listener has failed here (not necessarily the first one to fail). Then calling
+          //    [[markTaskFailed]] it again here is a no-op, and we simply resume running the
+          //    remaining completion listeners.
+          //
+          // 3. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, and now a completion listener has failed here (the first one to
+          //    fail). Then our call to [[markTaskFailed]] here will run all failure listeners
+          //    before returning, after which we will resume running the remaining completion
+          //    listeners.
+          //
+          // 4. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, but [[markTaskFailed]] is currently running because a completion listener
+          //    has failed, and now a failure listener has failed (not necessarily the first one to
+          //    fail). Then calling [[markTaskFailed]] again here will have no effect, and we simply
+          //    resume running the remaining failure listeners; we will resume running the remaining
+          //    completion listeners after this call returns.
+          //
+          // 5. [[Task.doRunTask]] is currently calling [[markTaskCompleted]] because the task body
+          //    succeeded, [[markTaskFailed]] already ran because a completion listener previously
+          //    failed, and now another completion listener has failed. Then our call to
+          //    [[markTaskFailed]] here will have no effect and we simply resume running the
+          //    remaining completion handlers.
+          try {
+            listenerInvocationThread = None

Review Comment:
   To be more precise, it's **_duplicate_ invocations of the _same_ listener** that I am concerned about.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org