You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/05/16 08:57:44 UTC

[GitHub] [spark] AngersZhuuuu opened a new pull request, #36564: [WIP][SPARK-39195][SQL] Spark should use two step update of outputCommitCoordinator

AngersZhuuuu opened a new pull request, #36564:
URL: https://github.com/apache/spark/pull/36564

   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should help keep file consistent with task status.

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r883514715


##########
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala:
##########
@@ -200,6 +247,42 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
     }
   }
 
+  private[scheduler] def handleCommitOutputSuccess(
+      stage: Int,
+      stageAttempt: Int,
+      partition: Int,
+      attemptNumber: Int): Unit = synchronized {
+    stageStates.get(stage) match {
+      case Some(state) if attemptFailed(state, stageAttempt, partition, attemptNumber) =>
+        throw new SparkException(s"Authorized committer (attemptNumber=$attemptNumber, " +
+          s"stage=$stage, partition=$partition) failed; but task commit success, " +
+          s"should fail the job")

Review Comment:
   Seems throw exception here won't stop the job, any suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893061431


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -187,12 +181,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
       attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
-    // A new task should now be allowed to become the authorized committer
-    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,

Review Comment:
   We need this level of test for the new behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894632434


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -46,21 +45,21 @@ class OutputCommitCoordinatorIntegrationSuite
   }
 
   test("exception thrown in OutputCommitter.commitTask()") {
-    // Regression test for SPARK-10381

Review Comment:
   we should still keep this comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1151945533

   Doc failed seems not related to this pr, any more suggestion?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1154633468

   > LGTM if tests pass
   
   GA failed not related to this PR
   
   ```
   __w/spark/spark/docs/_plugins/copy_api_dirs.rb:130:in `<top (required)>': Python doc generation failed (RuntimeError)
   2022-06-11T07:16:53.8252035Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:60:in `require'
   2022-06-11T07:16:53.8252686Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:60:in `block in require_with_graceful_fail'
   2022-06-11T07:16:53.8253306Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:57:in `each'
   2022-06-11T07:16:53.8253947Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/external.rb:57:in `require_with_graceful_fail'
   2022-06-11T07:16:53.8254613Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:89:in `block in require_plugin_files'
   2022-06-11T07:16:53.8255235Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:87:in `each'
   2022-06-11T07:16:53.8255870Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:87:in `require_plugin_files'
   2022-06-11T07:16:53.8256520Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/plugin_manager.rb:21:in `conscientious_require'
   2022-06-11T07:16:53.8257116Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/site.rb:131:in `setup'
   2022-06-11T07:16:53.8257695Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/site.rb:36:in `initialize'
   2022-06-11T07:16:53.8258278Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:30:in `new'
   2022-06-11T07:16:53.8258888Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:30:in `process'
   2022-06-11T07:16:53.8259731Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `block in process_with_graceful_fail'
   2022-06-11T07:16:53.8260335Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `each'
   2022-06-11T07:16:53.8260952Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/command.rb:91:in `process_with_graceful_fail'
   2022-06-11T07:16:53.8261625Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/lib/jekyll/commands/build.rb:18:in `block (2 levels) in init_with_program'
   2022-06-11T07:16:53.8262274Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `block in execute'
   2022-06-11T07:16:53.8262921Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `each'
   2022-06-11T07:16:53.8263524Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/command.rb:221:in `execute'
   2022-06-11T07:16:53.8264134Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary/program.rb:44:in `go'
   2022-06-11T07:16:53.8264718Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/mercenary-0.4.0/lib/mercenary.rb:21:in `program'
   2022-06-11T07:16:53.8265284Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/gems/jekyll-4.2.1/exe/jekyll:15:in `<top (required)>'
   2022-06-11T07:16:53.8265698Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/bin/jekyll:23:in `load'
   2022-06-11T07:16:53.8266159Z 	from /__w/spark/spark/docs/.local_ruby_bundle/ruby/2.7.0/bin/jekyll:23:in `<main>'
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "huaxingao (via GitHub)" <gi...@apache.org>.
huaxingao commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1776677664

   @cloud-fan Thanks for pinging me. It appears that Iceberg doesn't override this `useCommitCoordinator`. I will take a look at this issue. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893038044


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -187,8 +188,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
       attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
-    // A new task should now be allowed to become the authorized committer
-    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,
+    // A new task should not be allowed to become stage failed because of may cause data duplication

Review Comment:
   ```suggestion
       // A new task should not be allowed to become stage failed because of potential data duplication
   ```



##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -235,7 +236,8 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))
+    // A new task should not be allowed to become stage failed because of may cause data duplication

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r895826751


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -270,6 +263,16 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
       .stageStart(meq(retriedStage.head), any())
     verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head))
   }
+
+  test("SPARK-39195: Spark OutputCommitCoordinator should abort stage " +

Review Comment:
   is this test the same as https://github.com/apache/spark/pull/36564/files#diff-d5d8811dcf8af13683cb2bac4a583577cb2accca0cd64182cf214bbd174b017cR47 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should help keep file consistent with task status.

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1135711235

   gentle ping @dongjoon-hyun @cloud-fan @HyukjinKwon @srowen Could you take a view of this data correctness issue?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1774686626

   the new data lake formats all use transaction logs, I don't think coordinator is needed anymore.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] park OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r892518640


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2588,6 +2588,14 @@ private[spark] class DAGScheduler(
     runningStages -= stage
   }
 
+  private[scheduler] def abortStage(stageId: Int, reason: String): Unit = {
+    if (!stageIdToStage.contains(stageId)) {
+      // Skip all the actions if the stage has been removed.
+      return
+    }
+    abortStage(stageIdToStage(stageId), reason, None)

Review Comment:
   This is not safe. I think we should post an event to the dag scheduler event look to abort a stage. See also `DAGScheduler.cancelStage`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r896311302


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -270,6 +263,16 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
       .stageStart(meq(retriedStage.head), any())
     verify(sc.env.outputCommitCoordinator).stageEnd(meq(retriedStage.head))
   }
+
+  test("SPARK-39195: Spark OutputCommitCoordinator should abort stage " +

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r897951308


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -143,13 +144,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(tempDir.list().size === 1)
   }
 
-  test("If commit fails, if task is retried it should not be locked, and will succeed.") {

Review Comment:
   SGTM



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161546411

   I see the same test failed above https://github.com/AngersZhuuuu/spark/runs/6893999574?check_suite_focus=true#step:9:15747


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894635076


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -235,7 +226,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))

Review Comment:
   we need to update the code comment before this code chunk.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r899701507


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -235,7 +226,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))

Review Comment:
   Updated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r899208451


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -235,7 +226,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))

Review Comment:
   It has not been updated yet
   ```
       // Commit the 1st attempt, fail the 2nd attempt, make sure 3rd attempt cannot commit,
       // then fail the 1st attempt and make sure the 4th one can commit again.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "mstebelev (via GitHub)" <gi...@apache.org>.
mstebelev commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1774585834

   🤔 so the option useCommitCoordinator is for appending to the same destination, like inserting rows to a remote database and we don't need it in iceberg?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1786467568

   Thanks for confirming, @cloud-fan!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161591820

   > @AngersZhuuuu Mind taking a look please?
   
   Sure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894985147


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -289,6 +289,7 @@ private case class OutputCommitFunctions(tempDirPath: String) {
   // Mock output committer that simulates a failed commit (after commit is authorized)
   private def failingOutputCommitter = new FakeOutputCommitter {
     override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
+      super.commitTask(taskAttemptContext)

Review Comment:
   Revert this change, not need this change.



##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -46,21 +45,21 @@ class OutputCommitCoordinatorIntegrationSuite
   }
 
   test("exception thrown in OutputCommitter.commitTask()") {
-    // Regression test for SPARK-10381
-    failAfter(Span(60, Seconds)) {
+    val e = intercept[SparkException] {
       withTempDir { tempDir =>
         sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
       }
-    }
+    }.getCause.getMessage
+    assert(e.endsWith("failed; but task commit success, data duplication may happen."))
   }
 }
 
 private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
   override def commitTask(context: TaskAttemptContext): Unit = {
     val ctx = TaskContext.get()
+    super.commitTask(context)

Review Comment:
   revert now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894985431


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -235,7 +226,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(!outputCommitCoordinator.canCommit(stage, 3, partition, taskAttempt))
     outputCommitCoordinator.taskCompleted(stage, 1, partition, taskAttempt,
       ExecutorLostFailure("0", exitCausedByApp = true, None))
-    assert(outputCommitCoordinator.canCommit(stage, 4, partition, taskAttempt))

Review Comment:
   How about current?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894636200


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -289,6 +289,7 @@ private case class OutputCommitFunctions(tempDirPath: String) {
   // Mock output committer that simulates a failed commit (after commit is authorized)
   private def failingOutputCommitter = new FakeOutputCommitter {
     override def commitTask(taskAttemptContext: TaskAttemptContext): Unit = {
+      super.commitTask(taskAttemptContext)

Review Comment:
   I'm a bit worried about changing the behavior for the existing testing output committer. Can we create a new one for the new test case?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "aokolnychyi (via GitHub)" <gi...@apache.org>.
aokolnychyi commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1786251500

   @huaxingao @cloud-fan, could you confirm only a single `WriterCommitMessage` will be passed in case of speculative execution even without the commit coordinator? Based on what I see in `V2TableWriteExec`, if multiple task attempts succeed, the last one will win by replacing the commit message from the first one. Am I right?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1786265601

   @aokolnychyi In most cases, yes. However, we have `BatchWrite#onDataWriterCommit`, so implementations can decide how to deal with conflicting commit messages, maybe first-win.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893074610


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -187,12 +181,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
       attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
-    // A new task should now be allowed to become the authorized committer
-    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,

Review Comment:
   > We need this level of this for the new behavior
   
   How about current?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894633551


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -46,21 +45,21 @@ class OutputCommitCoordinatorIntegrationSuite
   }
 
   test("exception thrown in OutputCommitter.commitTask()") {
-    // Regression test for SPARK-10381
-    failAfter(Span(60, Seconds)) {
+    val e = intercept[SparkException] {
       withTempDir { tempDir =>
         sc.parallelize(1 to 4, 2).map(_.toString).saveAsTextFile(tempDir.getAbsolutePath + "/out")
       }
-    }
+    }.getCause.getMessage
+    assert(e.endsWith("failed; but task commit success, data duplication may happen."))
   }
 }
 
 private class ThrowExceptionOnFirstAttemptOutputCommitter extends FileOutputCommitter {
   override def commitTask(context: TaskAttemptContext): Unit = {
     val ctx = TaskContext.get()
+    super.commitTask(context)

Review Comment:
   why this change? I think the original test wanted to test the case that we failed before committing in the first task attempt.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1154805408

   Latest GA failed seems not related too..


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r897024156


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -47,11 +46,12 @@ class OutputCommitCoordinatorIntegrationSuite
 
   test("exception thrown in OutputCommitter.commitTask()") {

Review Comment:
   > since we changed this test, let's put the JIRA ID in the test name
   
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161382988

   ping @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should help keep file consistent with task status.

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1141543181

   ping @cloud-fan Could you take a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] park OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r892511172


##########
core/src/main/scala/org/apache/spark/mapred/SparkHadoopMapRedUtil.scala:
##########
@@ -76,6 +76,8 @@ object SparkHadoopMapRedUtil extends Logging {
 
         if (canCommit) {
           performCommit()
+          outputCommitCoordinator.commitSuccess(ctx.stageId(),

Review Comment:
   It's possible that the executor gets killed right after `performCommit()` and we still have the data duplication issue. I don't think we need this `commit success` event. We should mark the partition as committed once `canCommit` returns true.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] park OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r892558108


##########
core/src/main/scala/org/apache/spark/scheduler/OutputCommitCoordinator.scala:
##########
@@ -155,9 +159,9 @@ private[spark] class OutputCommitCoordinator(conf: SparkConf, isDriver: Boolean)
         val taskId = TaskIdentifier(stageAttempt, attemptNumber)
         stageState.failures.getOrElseUpdate(partition, mutable.Set()) += taskId
         if (stageState.authorizedCommitters(partition) == taskId) {
-          logDebug(s"Authorized committer (attemptNumber=$attemptNumber, stage=$stage, " +
-            s"partition=$partition) failed; clearing lock")
-          stageState.authorizedCommitters(partition) = null
+          sc.foreach(_.dagScheduler.abortStage(stage, s"Authorized committer " +
+            s"(attemptNumber=$attemptNumber, stage=$stage, partition=$partition) failed; " +
+            s"but task commit success, should fail the job"))

Review Comment:
   ```suggestion
               s"but task commit success, data duplication may happen."))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893072393


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -461,7 +467,8 @@ class SparkContext(config: SparkConf) extends Logging {
     listenerBus.addToStatusQueue(_statusStore.listener.get)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
-    _env = createSparkEnv(_conf, isLocal, listenerBus)
+    _env = createSparkEnv(_conf, isLocal, listenerBus,
+        new OutputCommitCoordinator(conf, true, Option(this)))

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r894985125


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -46,21 +45,21 @@ class OutputCommitCoordinatorIntegrationSuite
   }
 
   test("exception thrown in OutputCommitter.commitTask()") {
-    // Regression test for SPARK-10381

Review Comment:
   Add back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r897011726


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorIntegrationSuite.scala:
##########
@@ -47,11 +46,12 @@ class OutputCommitCoordinatorIntegrationSuite
 
   test("exception thrown in OutputCommitter.commitTask()") {

Review Comment:
   since we changed this test, let's put the JIRA ID in the test name



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161545527

   Hm, I think the test here seems flaky:
   
   ```
   [info] OutputCommitCoordinatorSuite:
   [info] - Only one of two duplicate commit tasks should commit (80 milliseconds)
   [info] - If commit fails, if task is retried it should not be locked, and will succeed. *** FAILED *** (69 milliseconds)
   [info]   org.apache.spark.SparkException: Job aborted due to stage failure: Authorized committer (attemptNumber=0, stage=0, partition=0) failed; but task commit success, data duplication may happen.
   [info]   at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2706)
   [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2642)
   [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2641)
   [info]   at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
   [info]   at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
   [info]   at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
   [info]   at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2641)
   [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1(DAGScheduler.scala:1182)
   [info]   at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleStageFailed$1$adapted(DAGScheduler.scala:1182)
   [info]   at scala.Option.foreach(Option.scala:407)
   [info]   at org.apache.spark.scheduler.DAGScheduler.handleStageFailed(DAGScheduler.scala:1182)
   [info]   at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2894)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161546724

   @AngersZhuuuu Mind taking a look please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status
URL: https://github.com/apache/spark/pull/36564


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1161437174

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893037549


##########
core/src/main/scala/org/apache/spark/SparkEnv.scala:
##########
@@ -423,6 +432,7 @@ object SparkEnv extends Logging {
 
     envInstance
   }
+  // scalastyle:on argcount

Review Comment:
   this can be put right after the parameter list



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r897810583


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -143,13 +144,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     assert(tempDir.list().size === 1)
   }
 
-  test("If commit fails, if task is retried it should not be locked, and will succeed.") {

Review Comment:
   @cloud-fan This behavior changed too, so maybe we should remove this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HyukjinKwon commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
HyukjinKwon commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1174576513

   @AngersZhuuuu mind taking a look https://issues.apache.org/jira/browse/SPARK-39622? I don't think there's any other committer release changes except this .. cc @HeartSaVioR FYI


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893017815


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -461,7 +467,8 @@ class SparkContext(config: SparkConf) extends Logging {
     listenerBus.addToStatusQueue(_statusStore.listener.get)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
-    _env = createSparkEnv(_conf, isLocal, listenerBus)
+    _env = createSparkEnv(_conf, isLocal, listenerBus,
+        new OutputCommitCoordinator(conf, true, Option(this)))

Review Comment:
   > this looks weird, I think we just need to pass `this` to `createSparkEnv`, which creates driver-side `SparkEnv` that contains `OutputCommitCoordinator`
   
   How about current?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] park OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r892521784


##########
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala:
##########
@@ -2588,6 +2588,14 @@ private[spark] class DAGScheduler(
     runningStages -= stage
   }
 
+  private[scheduler] def abortStage(stageId: Int, reason: String): Unit = {
+    if (!stageIdToStage.contains(stageId)) {
+      // Skip all the actions if the stage has been removed.
+      return
+    }
+    abortStage(stageIdToStage(stageId), reason, None)

Review Comment:
   Actually, we can generalize `taskSetFailed` a little bit, as what it does is just aborting a stage.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] park OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r892566204


##########
core/src/main/scala/org/apache/spark/SparkContext.scala:
##########
@@ -461,7 +467,8 @@ class SparkContext(config: SparkConf) extends Logging {
     listenerBus.addToStatusQueue(_statusStore.listener.get)
 
     // Create the Spark execution environment (cache, map output tracker, etc)
-    _env = createSparkEnv(_conf, isLocal, listenerBus)
+    _env = createSparkEnv(_conf, isLocal, listenerBus,
+        new OutputCommitCoordinator(conf, true, Option(this)))

Review Comment:
   this looks weird, I think we just need to pass `this` to `createSparkEnv`, which creates driver-side `SparkEnv` that contains `OutputCommitCoordinator`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on code in PR #36564:
URL: https://github.com/apache/spark/pull/36564#discussion_r893061431


##########
core/src/test/scala/org/apache/spark/scheduler/OutputCommitCoordinatorSuite.scala:
##########
@@ -187,12 +181,6 @@ class OutputCommitCoordinatorSuite extends SparkFunSuite with BeforeAndAfter {
     // The authorized committer now fails, clearing the lock
     outputCommitCoordinator.taskCompleted(stage, stageAttempt, partition,
       attemptNumber = authorizedCommitter, reason = TaskKilled("test"))
-    // A new task should now be allowed to become the authorized committer
-    assert(outputCommitCoordinator.canCommit(stage, stageAttempt, partition,

Review Comment:
   We need this level of this for the new behavior



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AngersZhuuuu commented on pull request #36564: [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status

Posted by GitBox <gi...@apache.org>.
AngersZhuuuu commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1175851865

   Yea


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "mstebelev (via GitHub)" <gi...@apache.org>.
mstebelev commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1774214202

   Hi @AngersZhuuuu. I came across problems with this changes after updating to spark 3.4
   I write data to iceberg table with S3 backend and the data upload happens in dataWriter.commit() after coordinator.canCommit was called. So if uploading to S3 fails for some reason, the task fails and the partition data can't be uploaded anymore event in the task's retries, because the failed task remains to be the assigned commiter.
   Looks like usually data writing is idempotent, because each partition is written into a separate file and you always can do it again in retrying task without data duplication. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-39195][SQL] Spark OutputCommitCoordinator should abort stage when committed file not consistent with task status [spark]

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #36564:
URL: https://github.com/apache/spark/pull/36564#issuecomment-1774351335

   I'm surprised that iceberg does not overwrite https://github.com/apache/spark/blob/master/sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/BatchWrite.java#L63
   
   cc @huaxingao @dongjoon-hyun 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org