You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rdblue <gi...@git.apache.org> on 2018/02/02 22:30:11 UTC

[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Add support for commit coordi...

GitHub user rdblue opened a pull request:

    https://github.com/apache/spark/pull/20490

    [SPARK-23323][SQL]: Add support for commit coordinator for DataSourceV2 writes

    ## What changes were proposed in this pull request?
    
    DataSourceV2 batch writes should use the output commit coordinator if it is required by the data source. This adds a new method, `DataWriterFactory#useCommitCoordinator`, that determines whether the coordinator will be used. If the write factory returns true, `WriteToDataSourceV2` will use the coordinator for batch writes.
    
    This relies on the commits in #20387. Once that is committed, this will be rebased. Only the last commit is part of this PR.
    
    ## How was this patch tested?
    
    This relies on existing write tests, which now use the commit coordinator.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rdblue/spark SPARK-23323-add-commit-coordinator

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20490
    
----
commit 62c569672083c0fa633da1d6edaba40d0bb05819
Author: Ryan Blue <bl...@...>
Date:   2018-01-17T21:58:12Z

    SPARK-22386: DataSourceV2: Use immutable logical plans.

commit f0bd45d3c931941b8092cdac738cb29954e0acdd
Author: Ryan Blue <bl...@...>
Date:   2018-01-24T19:34:42Z

    SPARK-23203: Fix scala style check.

commit 2fdeb4556cd22a092630b341a22a16a59e377183
Author: Ryan Blue <bl...@...>
Date:   2018-01-24T19:54:10Z

    SPARK-23203: Fix Kafka tests, use StreamingDataSourceV2Relation.
    
    This also removes unused imports.

commit ab945a19efe666c41deae9c044002f3455220c1d
Author: Ryan Blue <bl...@...>
Date:   2018-02-02T20:30:33Z

    SPARK-23204: DataFrameReader: Remove v2 table identifier parsing.

commit f1d9872a2699cdbd5c87b02e702dc8103335131d
Author: Ryan Blue <bl...@...>
Date:   2018-02-02T21:48:29Z

    SPARK-23203: Remove import changes from DataSourceV2Utils.

commit 288af6a2729c769e0d4075a8f9190958ab5a211c
Author: Ryan Blue <bl...@...>
Date:   2018-02-02T22:21:48Z

    SPARK-23323: DataSourceV2: support commit coordinator.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167386125
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can
    +   * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
    +   * attempts may have committed successfully and all successful commit messages are passed to this
    --- End diff --
    
    `... committed successfully, and  Spark will pick the commit message that arrives at driver side first.`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by tedyu <gi...@git.apache.org>.
Github user tedyu commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r184736836
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging {
       def run(
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
    -      iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +      iter: Iterator[InternalRow],
    +      useCommitCoordinator: Boolean): WriterCommitMessage = {
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
    +          dataWriter.commit()
    +        } else {
    +          val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit"
    +          logInfo(message)
    --- End diff --
    
    This should be WARN or ERROR since exception is thrown below.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87126 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87126/testReport)** for PR 20490 at commit [`2ac1fa2`](https://github.com/apache/spark/commit/2ac1fa23781b04172b9bf33380656a5e9c885db7).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87169 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87169/testReport)** for PR 20490 at commit [`9353074`](https://github.com/apache/spark/commit/9353074ae18da971ebb0fadc2a986933442b46f1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167011250
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the OutputCommitCoordinator to allow only one attempt to commit.
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    @cloud-fan, I don't think we can rely on user code propagating `InterruptedException`. How to handle that exception in particular is one of the least understood parts of the JVM.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/644/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    The test failure doesn't look related to these changes to me. How can I get on the list to ask jenkins to retest a PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166463921
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    > As we can see, the number of people who can correctly implement a committer is << than those who have shipped one
    
    Totally agree. Great quote.
    
    From Wenchen's comments, I think we're in agreement that the default should be to use the commit coordinator. We just need to figure out how to get it in and how to document what it guarantees.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87269 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87269/testReport)** for PR 20490 at commit [`ec96856`](https://github.com/apache/spark/commit/ec968563605f961d3d874913de51265683a8c132).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167011220
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the OutputCommitCoordinator to allow only one attempt to commit.
    +   * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple
    --- End diff --
    
    I clarified this and added a note about how to do it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166398432
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    Let me know if you want me to change this PR.
    
    I'd like to see this go into 2.3.0 if there's still time. Just because it is documented doesn't mean it isn't a choice that severely limits the utility of DataSourceV2. I'd rather not support work-arounds for the life of 2.3.0.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/600/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166898992
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---
    @@ -32,6 +32,16 @@
     @InterfaceStability.Evolving
     public interface DataWriterFactory<T> extends Serializable {
     
    +  /**
    +   * Returns whether Spark should use the OutputCommitCoordinator to ensure that only one attempt
    +   * for each task commits.
    +   *
    +   * @return true if commit coordinator should be used, false otherwise.
    +   */
    +  default boolean useCommitCoordinator() {
    --- End diff --
    
    it's weird to put this method in `DataWriterFactory`, as it's not related to factory. How about we put it in `DataSourceWriter`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/545/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166684043
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---
    @@ -20,6 +20,7 @@
     import java.io.Serializable;
     
     import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.scheduler.OutputCommitCoordinator;
    --- End diff --
    
    Fixed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    just type "retest this please"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167386169
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
       def run(
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
    -      iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +      iter: Iterator[InternalRow],
    +      useCommitCoordinator: Boolean): WriterCommitMessage = {
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
    +          dataWriter.commit()
    +
    --- End diff --
    
    nit: remove unnecessary blank line


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167386061
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can
    --- End diff --
    
    nit: `only one` -> `at most one`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87229 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87229/testReport)** for PR 20490 at commit [`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/737/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/640/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166374405
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    Yea it makes sense to use a commit coordinator by default, but I think we need to carefully design the API to introduce the concept of commit coordinator, just a `boolean useCommitCoordinator()` seems not enough. We also need to update the documentation of the write APIs, to clearly specify in which phase the commit coordinator is involved and how it works.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167645370
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can
    +   * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
    +   * attempts may have committed successfully and all successful commit messages are passed to this
    --- End diff --
    
    I'm changing the wording to this to capture the behavior:
    
    > If disabled, multiple attempts may have committed successfully and one successful commit message per task will be passed to this commit method. The remaining commit messages are ignored by Spark.
    
    I think we should fix this for non-coordinated commits, but it doesn't need to block the commit to get coordinator support in.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166448459
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
    +          dataWriter.commit()
    +
    +        } else {
    +          val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit"
    +          logInfo(message)
    +          // throwing CommitDeniedException will trigger the catch block for abort
    +          throw new CommitDeniedException(message, stageId, partId, attemptId)
    +        }
    +
    +      } else {
    +        logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    +        dataWriter.commit()
    +      }
    +
    +      logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.")
    --- End diff --
    
    It's implicitly done in the logs anyway, but I've found tracking the duration of these operations useful


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87125/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87338 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87338/testReport)** for PR 20490 at commit [`538bc86`](https://github.com/apache/spark/commit/538bc864f8ebb8d1b7e63c26806f209f2c3b0fc4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    @cloud-fan, please have another look. I fixed the problems you spotted.
    
    I haven't added support for the streaming side. It is different enough that I think we should do it in a follow-up. More details are on the thread.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    LGTM


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87169 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87169/testReport)** for PR 20490 at commit [`9353074`](https://github.com/apache/spark/commit/9353074ae18da971ebb0fadc2a986933442b46f1).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87132/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    @dongjoon-hyun, @cloud-fan, @gatorsmile. Once the immutable plan PR is in, this can be reviewed.
    
    @steveloughran, I think this is what you were asking for.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87229 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87229/testReport)** for PR 20490 at commit [`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Accidentally closed & reopened. Oops.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87269 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87269/testReport)** for PR 20490 at commit [`ec96856`](https://github.com/apache/spark/commit/ec968563605f961d3d874913de51265683a8c132).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166514084
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---
    @@ -20,6 +20,7 @@
     import java.io.Serializable;
     
     import org.apache.spark.annotation.InterfaceStability;
    +import org.apache.spark.scheduler.OutputCommitCoordinator;
    --- End diff --
    
    Unused import?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87244 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87244/testReport)** for PR 20490 at commit [`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87229/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87009 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87009/testReport)** for PR 20490 at commit [`288af6a`](https://github.com/apache/spark/commit/288af6a2729c769e0d4075a8f9190958ab5a211c).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166898718
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the OutputCommitCoordinator to allow only one attempt to commit.
    --- End diff --
    
    don't say `OutputCommitCoordinator` as it's an internal class. We can just say `a commit coordinator`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87244 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87244/testReport)** for PR 20490 at commit [`e9964ca`](https://github.com/apache/spark/commit/e9964ca2fc831819662056210db594f613bce5d0).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87078 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87078/testReport)** for PR 20490 at commit [`ebe9d56`](https://github.com/apache/spark/commit/ebe9d56094e53d1a8f7083eae781aa490d96d80b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87244/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/20490


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87125 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87125/testReport)** for PR 20490 at commit [`14b4a95`](https://github.com/apache/spark/commit/14b4a95b9c0ce0024e304d3cd48880a260df0f81).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167280511
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -62,6 +62,16 @@
        */
       DataWriterFactory<Row> createWriterFactory();
     
    +  /**
    +   * Returns whether Spark should use the commit coordinator to ensure that only one attempt for
    --- End diff --
    
    Currently, the commit coordinator will only authorize one attempt and only authorize another if the authorized attempt fails, so it does ensure that only one attempt commits. Do you think the wording here needs to change?
    
    Instead of documenting the behavior of the commit coordinator here, I'd rather point to its docs. Are those written, or is the coordinator an internal class?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167644516
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +88,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the commit coordinator to allow only one attempt to commit. Implementations can
    +   * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
    +   * attempts may have committed successfully and all successful commit messages are passed to this
    --- End diff --
    
    I think we need to address this guarantee. Spark will just drop commit messages? That seems like a huge problem to me.
    
    cc @steveloughran 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Been having talks with colleagues last week and want to check something. 
    
    How exactly do Spark executors abort speculative jobs without waiting for them get into the ready-to-commit phase. As I was told: by interrupting the thread. If so, can someone point me to where this happens?
    
    Assuming this really is the case, then there should really be a callback on task committers to tell them they've just been interrupted and to react accordingly —or maybe the task cleanup callback should be told that the cleanup is due to an interruption. For the mapreduce commit protocol; normal task cleanup should be called after the interrupt, other committers may do more.
    
    Rationale
    * if temp space is used on the host machines, job cleanup cannot reach it, so the space will only get cleaned up as cron jobs purge old content. Ryan's staging committer is the reference example here.
    * if the task consumes expensive remote resources (DB connections, etc), then releasing them early, i.e. before the job eventually completes, could free them up for others. Or indeed, expensive in-VM resources, like a thread pool of http clients.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Overall LGTM, I don't have a better idea. also cc @rxin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166381800
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    What do you have in mind to "introduce the concept"?
    
    I'm happy to add more docs. Do you want me to add them to this PR or in a follow-up? Are you targeting this for 2.3.0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166898212
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the OutputCommitCoordinator to allow only one attempt to commit.
    +   * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple
    --- End diff --
    
    we should mention that users can disable this and use their customer commit coordinator.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167011291
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java ---
    @@ -32,6 +32,16 @@
     @InterfaceStability.Evolving
     public interface DataWriterFactory<T> extends Serializable {
     
    +  /**
    +   * Returns whether Spark should use the OutputCommitCoordinator to ensure that only one attempt
    +   * for each task commits.
    +   *
    +   * @return true if commit coordinator should be used, false otherwise.
    +   */
    +  default boolean useCommitCoordinator() {
    --- End diff --
    
    I moved it. Good idea.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
GitHub user rdblue reopened a pull request:

    https://github.com/apache/spark/pull/20490

    [SPARK-23323][SQL]: Support commit coordinator for DataSourceV2 writes

    ## What changes were proposed in this pull request?
    
    DataSourceV2 batch writes should use the output commit coordinator if it is required by the data source. This adds a new method, `DataWriterFactory#useCommitCoordinator`, that determines whether the coordinator will be used. If the write factory returns true, `WriteToDataSourceV2` will use the coordinator for batch writes.
    
    ## How was this patch tested?
    
    This relies on existing write tests, which now use the commit coordinator.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rdblue/spark SPARK-23323-add-commit-coordinator

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20490.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20490
    
----
commit ebe9d56094e53d1a8f7083eae781aa490d96d80b
Author: Ryan Blue <bl...@...>
Date:   2018-02-02T22:21:48Z

    SPARK-23323: DataSourceV2: support commit coordinator.

commit 14b4a95b9c0ce0024e304d3cd48880a260df0f81
Author: Ryan Blue <bl...@...>
Date:   2018-02-06T19:30:51Z

    Update documentation in DataSourceWriter for commit coordination.

commit 2ac1fa23781b04172b9bf33380656a5e9c885db7
Author: Ryan Blue <bl...@...>
Date:   2018-02-06T20:04:35Z

    Fix javadoc for DataWriterFactory.

commit c982d3a5d0a895ad33a696a7b0fbd9453724fdb4
Author: Ryan Blue <bl...@...>
Date:   2018-02-06T22:28:55Z

    Remove link to OutputCommitCoordinator because Javadoc can't find it.

commit 9353074ae18da971ebb0fadc2a986933442b46f1
Author: Ryan Blue <bl...@...>
Date:   2018-02-07T16:59:21Z

    Remove unused import.

commit a2a0ec8b440152be0f643fd89dcce2c0f51612c1
Author: Ryan Blue <bl...@...>
Date:   2018-02-08T17:32:34Z

    Move useCommitCoordinator to DataSourceWriter.
    
    This should be configured by the writer, not the factory that creates
    DataWriters.

commit e9964ca2fc831819662056210db594f613bce5d0
Author: Ryan Blue <bl...@...>
Date:   2018-02-08T20:13:31Z

    Avoid catching writer in Java serialization.

commit ec968563605f961d3d874913de51265683a8c132
Author: Ryan Blue <bl...@...>
Date:   2018-02-09T19:20:25Z

    Only one => at most one.

commit 538bc864f8ebb8d1b7e63c26806f209f2c3b0fc4
Author: Ryan Blue <bl...@...>
Date:   2018-02-12T18:32:13Z

    Fix docs and style nit.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/673/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166360278
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    The API is flexible. The problem is that it defaults to no coordination, which cause correctness bugs.
    
    The safe option is to coordinate commits by default. If an implementation doesn't change the default, then it at least won't duplicate task outputs in job commit. Worst case is that it takes a little longer for committers that don't need coordination. On the other hand, not making this the default will cause some writers to work most of the time, but duplicate data in some cases.
    
    What do you think is the down side to adding this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87338/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    This is the writing code I was talking about:
    ```
        // write the data and commit this writer.
        Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
          iter.foreach(dataWriter.write)
          logInfo(s"Writer for partition ${context.partitionId()} is committing.")
          val msg = dataWriter.commit()
          logInfo(s"Writer for partition ${context.partitionId()} committed.")
          msg
        })(catchBlock = {
          // If there is an error, abort this writer
          logError(s"Writer for partition ${context.partitionId()} is aborting.")
          dataWriter.abort()
          logError(s"Writer for partition ${context.partitionId()} aborted.")
        })
    ```
    What we can probably do is to check job cancellation periodically during `iter.foreach(dataWriter.write)`, e.g. do a check for every 1k writes.
    
    Anyway let's merge this PR first. I'm only merging to master, let's backport it to 2.3 if RC3 fails(very likely to happen as there are already several regressions show up)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166995080
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -78,10 +78,11 @@ default void onDataWriterCommit(WriterCommitMessage message) {}
        * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
        * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * Note that speculative execution may cause multiple tasks to run for a partition. By default,
    +   * Spark uses the OutputCommitCoordinator to allow only one attempt to commit.
    +   * {@link DataWriterFactory} implementations can disable this behavior. If disabled, multiple
    --- End diff --
    
    It says that already: "DataWriterFactory implementations can disable this behavior."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87224/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    retest this please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167009143
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    --- End diff --
    
    Is it a good idea to add streaming to this commit?
    
    The changes differ significantly. It isn't clear how commit coordination happens for streaming writes. The OutputCommitCoordinator's `canCommit` method takes stage, partition, and attempt ids, not epochs. Either the other components aren't ready to have commit coordination, or I'm not familiar enough with how it is done for streaming.
    
    I think we can keep the two separate, and I'm happy to open a follow-up issue for the streaming side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87009/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87224 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87224/testReport)** for PR 20490 at commit [`a2a0ec8`](https://github.com/apache/spark/commit/a2a0ec8b440152be0f643fd89dcce2c0f51612c1).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166447570
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    bq. We never guarantee that for an RDD partition, only one task can commit successfully
    
    There's at-least once though, right? And then the Job Commit (which is implicitly at-most-once) is expected to handle the situation wherein 1+ task may have committed, and should resolve it so that the output of only one task is added.
    
    One thing which I think would be good is for the spark docs to somewhere (scaladoc? markdown) to precisely write down its requirements of a committer. For the WiP paper on the new S3A committers, [I've tried to do this across MR & Spark](https://github.com/steveloughran/zero-rename-committer/blob/master/tex/a_zero_rename_committer.tex#L1993)
    
    1. Complete: you get the output of all committed tasks
    2. Exclusive: you only get the output of committed tasks
    3. (Consistent: produces right output even if store is inconsistent)
    4. Concurrent: >1 task may commit simultaneously
    5. Abortable: if you abort a task, no output is visible
    6. Continuity of correctness: after a job is committed,  no partitioned task may suddenly add its work to the output.
    
    Not required: if there's a partition and a 2nd task attempt is committed, the output of either one of those attempts must be committed, but the specifics of which one is left open.
    
    * Hadoop MR v1 meets 1-6 on HDFS, fails on 3 against raw S3
    * The Direct Parquet committer fails to meet requirements (2, 5 & probably 6)
    * The Hadoop MR v2 committer fails on 2, because if a task attempt commit fails partway through, some of its output may be in the dest dir. Both Spark and MR assume that this situation never occurs. Really, committers should be able to say "Doesn't support retry on task commit failure", or better. 
    
    Regarding this patch,
    
    1. how often do you actually expect people to be doing their own commit co-ordinator? 
    1. What's the likelihood that they will get it right?
    
    As we can see, the number of people who can correctly implement a committer is << than those who have shipped one; I don't see a commit coordinator being any different. It's good to offer the flexibility, but important to have the default being the one which everyone else uses and which is generally trusted.



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87126/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166174605
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    I'm not sure why we need this. In the implementation of `DataWriter.commit`, users can still call `SparkEnv.get. outputCommitCoordinator`. User can even use their own commit coordinator which is based on zookeeper or something.
    
    I think the current API is flexible enough to: 1) not use commit coordinator 2) use Spark built-in commit coordinator 3) use customer commit coordinator.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87132 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87132/testReport)** for PR 20490 at commit [`c982d3a`](https://github.com/apache/spark/commit/c982d3a5d0a895ad33a696a7b0fbd9453724fdb4).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    AFAIK, there is no KILL mechanism for speculative tasks, which means, if multiple task attempts are running at the same time and one of them finishes earlier, the others will keep running until finished.
    
    But your concern is valid, users can manually cancel a job/stage, and then the task thread will be interrupted.
    
    Fortunatelly this case is correctly handled. The writing logic is wrapped by `try-catch`. If the task is interrupted, `InterruptedException` will be thrown and caught, and `DataWriter.abort` will be called.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/639/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/811/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue closed the pull request at:

    https://github.com/apache/spark/pull/20490


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87269/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166395788
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    Since we have a workaround(call coordinator in `DataWriter.commit`), I don't think this should block the 2.3 release, but we can definitely get this in branch 2.3 if there is no breaking change on the public APIs.
    
    And I won't treat it as a correctness bug. The default no-coordinator behavior is well documented with the current APIs, see the classdoc of `DataWriter`. We never guarantee that for an RDD partition, only one task can commit successfully.
    
    > What do you have in mind to "introduce the concept"?
    
    I never thought about it before, I'll think about it these days.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167306077
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -62,6 +62,16 @@
        */
       DataWriterFactory<Row> createWriterFactory();
     
    +  /**
    +   * Returns whether Spark should use the commit coordinator to ensure that only one attempt for
    --- End diff --
    
    `only one` -> `at most one`? BTW I think we should not link to an internal class.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87125 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87125/testReport)** for PR 20490 at commit [`14b4a95`](https://github.com/apache/spark/commit/14b4a95b9c0ce0024e304d3cd48880a260df0f81).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87224 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87224/testReport)** for PR 20490 at commit [`a2a0ec8`](https://github.com/apache/spark/commit/a2a0ec8b440152be0f643fd89dcce2c0f51612c1).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/722/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87078 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87078/testReport)** for PR 20490 at commit [`ebe9d56`](https://github.com/apache/spark/commit/ebe9d56094e53d1a8f7083eae781aa490d96d80b).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166899259
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    --- End diff --
    
    I think we also need to handle it at the streaming side. Please check all the callers of `DataWriter.commit`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87126 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87126/testReport)** for PR 20490 at commit [`2ac1fa2`](https://github.com/apache/spark/commit/2ac1fa23781b04172b9bf33380656a5e9c885db7).
     * This patch **fails to generate documentation**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167321301
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -62,6 +62,16 @@
        */
       DataWriterFactory<Row> createWriterFactory();
     
    +  /**
    +   * Returns whether Spark should use the commit coordinator to ensure that only one attempt for
    --- End diff --
    
    I agree we shouldn't link to an internal class, but I don't think this is the place to document the built-in coordinator's behavior. Is there already a doc for that elsewhere that is public?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87132 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87132/testReport)** for PR 20490 at commit [`c982d3a`](https://github.com/apache/spark/commit/c982d3a5d0a895ad33a696a7b0fbd9453724fdb4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r166418424
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -117,20 +118,43 @@ object DataWritingSparkTask extends Logging {
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
           iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (writeTask.useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    --- End diff --
    
    I updated the `DataSourceWriter` docs for this change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Add support for commit coordinator f...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87009 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87009/testReport)** for PR 20490 at commit [`288af6a`](https://github.com/apache/spark/commit/288af6a2729c769e0d4075a8f9190958ab5a211c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/727/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/756/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    **[Test build #87338 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87338/testReport)** for PR 20490 at commit [`538bc86`](https://github.com/apache/spark/commit/538bc864f8ebb8d1b7e63c26806f209f2c3b0fc4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87169/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by steveloughran <gi...@git.apache.org>.
Github user steveloughran commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    @rdblue thanks. That was what I thought (the output coordinator doesn't tell incoming speculative work to abort until any actively committing task attempt has returned, I was just worried after the conversation.
    
    In a lot of the Hadoop FS code, InterruptedException is converted to `InterruptedIOException` to allow it to trickle up, but as other IOEs subclass that (socket/connect timeouts), you can't assume that `InterruptedIOException` implied job was interrupted, only that some IO went wrong. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87078/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20490: [SPARK-23323][SQL]: Support commit coordinator for DataS...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20490
  
    I've updated this to no longer require #20387. It wasn't relying on those changes at all. @gatorsmile, @cloud-fan, what do you think about getting this into 2.3.0?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167386197
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -116,21 +118,45 @@ object DataWritingSparkTask extends Logging {
       def run(
           writeTask: DataWriterFactory[InternalRow],
           context: TaskContext,
    -      iter: Iterator[InternalRow]): WriterCommitMessage = {
    -    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
    +      iter: Iterator[InternalRow],
    +      useCommitCoordinator: Boolean): WriterCommitMessage = {
    +    val stageId = context.stageId()
    +    val partId = context.partitionId()
    +    val attemptId = context.attemptNumber()
    +    val dataWriter = writeTask.createDataWriter(partId, attemptId)
     
         // write the data and commit this writer.
         Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
           iter.foreach(dataWriter.write)
    -      logInfo(s"Writer for partition ${context.partitionId()} is committing.")
    -      val msg = dataWriter.commit()
    -      logInfo(s"Writer for partition ${context.partitionId()} committed.")
    +
    +      val msg = if (useCommitCoordinator) {
    +        val coordinator = SparkEnv.get.outputCommitCoordinator
    +        val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
    +        if (commitAuthorized) {
    +          logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
    +          dataWriter.commit()
    +
    +        } else {
    +          val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit"
    --- End diff --
    
    `authorize to commit`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20490: [SPARK-23323][SQL]: Support commit coordinator fo...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20490#discussion_r167137165
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -62,6 +62,16 @@
        */
       DataWriterFactory<Row> createWriterFactory();
     
    +  /**
    +   * Returns whether Spark should use the commit coordinator to ensure that only one attempt for
    --- End diff --
    
    This is actually not a guarantee, is it?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org