You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/08/16 02:18:36 UTC

[GitHub] [spark] wangshengjie123 opened a new pull request, #37528: [WIP][SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

wangshengjie123 opened a new pull request, #37528:
URL: https://github.com/apache/spark/pull/37528

   
   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   If task failed with NotSerializableException or TaskOutputFileAlreadyExistException, send TaskEnd event.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   In dynamic allocation, we depends on TaskEnd to release executor when no task running, if TaskEnd event missed will cause executor not release normally, especially for long running applicaiton.
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   NO
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   UT


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1218868817

   Took a quick look - the basic idea looked good to me.
   
   Review note: other uses of `abort` are not susceptible to the same issue because here we have marked the task as finished already here - and so as part of `killAllTaskAttempts` (due to `abort`), this task wont be cancelled/have failure event fired for it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] Ngone51 commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
Ngone51 commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1217492287

   cc @tgravescs @mridulm 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r952102558


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   I change the methods: `isExecutorIdle` and `ensureExecutorIsTracked` in `ExecutorMonitor`, romoved all the reflection



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r953518157


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)

Review Comment:
   Thinking more, the complexity of doing this is not worth it.
   Let us keep the test as is.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r952066120


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   yeah, relaxing the visibility is helpful. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r952095530


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)

Review Comment:
   Actually, i want call `onExecutorAdded` and `onTaskStart`, but `onTaskStart` need executor be Active, `CoarseGrainedSchedulerBackend` need executor to register itself to `Driver`, it is hard to mock this. So i just mock the code logic in `onTaskStart`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #37528: [WIP][SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1216314989

   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r952062899


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2596,28 +2605,39 @@ class TaskSetManagerSuite
       .getDeclaredMethod("ensureExecutorIsTracked",
         classOf[String], classOf[Int])
     ensureExecutorIsTracked.setAccessible(true)
-    val executorTracker = ensureExecutorIsTracked.invoke(executorMonitor,
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
       "exec1".asInstanceOf[AnyRef],
       ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
-    executorTracker.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
 
-    // assert exec1 is not idle
+    // assert exec1 and exec2 is not idle

Review Comment:
   Got that. Fixed later, thanks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1225586359

   > Can you update the description @wangshengjie123 ? You did add UT to test the behavior. I will merge once that is done. Thanks !
   
   Done, thanks again @mridulm @Ngone51 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm closed pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm closed pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException
URL: https://github.com/apache/spark/pull/37528


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1219099925

   UT has been added and passed, i add new test for NotSerializableException, please take a look @Ngone51 @mridulm, thanks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1225385341

   Can you update the description @wangshengjie123 ? You did add UT to test the behavior.
   I will merge once that is done. Thanks !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1226003959

   Merged to master.
   Thanks for fixing this @wangshengjie123 !
   Thanks for the review @Ngone51 :-)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r953403495


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)

Review Comment:
   QQ: You are right, `onTaskStart` is better than `onExecutorAdded` - in which case, why not simply call onTaskStart instead of `ensureExecutorIsTracked` ?
   That way we are insulted from internal changes there.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r951813638


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2596,28 +2605,39 @@ class TaskSetManagerSuite
       .getDeclaredMethod("ensureExecutorIsTracked",
         classOf[String], classOf[Int])
     ensureExecutorIsTracked.setAccessible(true)
-    val executorTracker = ensureExecutorIsTracked.invoke(executorMonitor,
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
       "exec1".asInstanceOf[AnyRef],
       ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
-    executorTracker.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
 
-    // assert exec1 is not idle
+    // assert exec1 and exec2 is not idle

Review Comment:
   super nit: `is not` -> `are not`



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   Relax the visibility of method to `scheduler` package and avoid reflection ?



##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)

Review Comment:
   Instead, do we want to call `onExecutorAdded` ? That is the behavior we want to trigger right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] mridulm commented on a diff in pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
mridulm commented on code in PR #37528:
URL: https://github.com/apache/spark/pull/37528#discussion_r951820444


##########
core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala:
##########
@@ -2563,6 +2565,81 @@ class TaskSetManagerSuite
     assert(!manager.isZombie)
   }
 
+  test("SPARK-40094: Send TaskEnd if task failed with " +
+    "NotSerializableException or TaskOutputFileAlreadyExistException") {
+    val sparkConf = new SparkConf()
+      .setMaster("local-cluster[1,1,1024]")
+      .setAppName("SPARK-40094")
+      .set(config.DYN_ALLOCATION_TESTING, true)
+      .set(TEST_DYNAMIC_ALLOCATION_SCHEDULE_ENABLED, false)
+      .set(config.DYN_ALLOCATION_ENABLED, true)
+      .set(config.DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT, 1L)
+
+    // setup spark context and init ExecutorAllocationManager
+    sc = new SparkContext(sparkConf)
+    sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
+    // replace dagScheduler to let handleFailedTask send TaskEnd
+    sched.dagScheduler = sc.dagScheduler
+
+    val taskSet1 = FakeTask.createTaskSet(1)
+    val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES)
+    val taskSet2 = FakeTask.createTaskSet(1)
+    val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES)
+    assert(sched.taskSetsFailed.isEmpty)
+
+
+    val offerResult1 = manager1.resourceOffer("exec1", "host1", ANY)._1
+    assert(offerResult1.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult1.get.index === 0)
+
+    val offerResult2 = manager2.resourceOffer("exec2", "host2", ANY)._1
+    assert(offerResult2.isDefined,
+      "Expect resource offer on iteration 0 to return a task")
+    assert(offerResult2.get.index === 0)
+
+    val executorMonitor = sc.executorAllocationManager.get.executorMonitor
+
+    // reflection to mock ExecutorMonitor.onTaskStart
+    val ensureExecutorIsTracked = classOf[ExecutorMonitor]
+      .getDeclaredMethod("ensureExecutorIsTracked",
+        classOf[String], classOf[Int])
+    ensureExecutorIsTracked.setAccessible(true)
+
+    val executorTracker1 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec1".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker1.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+    val executorTracker2 = ensureExecutorIsTracked.invoke(executorMonitor,
+      "exec2".asInstanceOf[AnyRef],
+      ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID.asInstanceOf[AnyRef])
+    executorTracker2.asInstanceOf[executorMonitor.Tracker].updateRunningTasks(1)
+
+    // assert exec1 and exec2 is not idle
+    val method = classOf[ExecutorMonitor].getDeclaredMethod("isExecutorIdle",
+      classOf[String])

Review Comment:
   Relax the visibility of method to `scheduler` package and avoid reflection ? (this is a method for tests)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1217357504

   Hi @Ngone51, could you please take a look when time free, thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] wangshengjie123 commented on pull request #37528: [SPARK-40094][CORE] Send TaskEnd event when task failed with NotSerializableException or TaskOutputFileAlreadyExistException

Posted by GitBox <gi...@apache.org>.
wangshengjie123 commented on PR #37528:
URL: https://github.com/apache/spark/pull/37528#issuecomment-1217493598

   > Good catch! Could you add a test for the change?
   
   Sure, I will add a UT later. I change the title to WIP.
   
   Thanks a lot @Ngone51  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org