You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/17 03:41:21 UTC

[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #37531: [WIP][SPARK-40106] Task failure should always trigger task failure listeners

ryan-johnson-databricks commented on code in PR #37531:
URL: https://github.com/apache/spark/pull/37531#discussion_r947422496


##########
core/src/main/scala/org/apache/spark/scheduler/Task.scala:
##########
@@ -136,44 +136,27 @@ private[spark] abstract class Task[T](
     plugins.foreach(_.onTaskStart())
 
     try {
-      runTask(context)
-    } catch {
-      case e: Throwable =>
-        // Catch all errors; run task failure callbacks, and rethrow the exception.
-        try {
-          context.markTaskFailed(e)
-        } catch {
-          case t: Throwable =>
-            e.addSuppressed(t)
-        }
-        context.markTaskCompleted(Some(e))
-        throw e
+      context.runTaskWithListeners(this)
     } finally {
       try {
-        // Call the task completion callbacks. If "markTaskCompleted" is called twice, the second
-        // one is no-op.
-        context.markTaskCompleted(None)
-      } finally {
-        try {
-          Utils.tryLogNonFatalError {
-            // Release memory used by this thread for unrolling blocks
-            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
-            SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(
-              MemoryMode.OFF_HEAP)
-            // Notify any tasks waiting for execution memory to be freed to wake up and try to
-            // acquire memory again. This makes impossible the scenario where a task sleeps forever
-            // because there are no other tasks left to notify it. Since this is safe to do but may
-            // not be strictly necessary, we should revisit whether we can remove this in the
-            // future.
-            val memoryManager = SparkEnv.get.memoryManager
-            memoryManager.synchronized { memoryManager.notifyAll() }
-          }
-        } finally {
-          // Though we unset the ThreadLocal here, the context member variable itself is still
-          // queried directly in the TaskRunner to check for FetchFailedExceptions.
-          TaskContext.unset()
-          InputFileBlockHolder.unset()
+        Utils.tryLogNonFatalError {

Review Comment:
   Indentation-only change, best viewed with whitespace ignored:
   https://github.com/apache/spark/pull/37531/files?diff=split&w=1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org