You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by gengliangwang <gi...@git.apache.org> on 2018/01/24 17:27:05 UTC

[GitHub] spark pull request #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Wr...

GitHub user gengliangwang opened a pull request:

    https://github.com/apache/spark/pull/20386

    [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.commit into two phase

    ## What changes were proposed in this pull request?
    
    Currently, the api `DataSourceV2Writer#commit(WriterCommitMessage[])` commits a 
    
    writing job with a list of commit messages.
    
    It makes sense in some scenarios, e.g. MicroBatchExecution.
    
    However, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected.
    
    The proposal is to Break down `DataSourceV2Writer.commit` into two phase:
    
    1. `add(WriterCommitMessage message)`: Handles a commit message produced by {@link DataWriter#commit()}.
    2. `commit()`:  Commits the writing job.
    This should make the API more flexible, and more reasonable for implementing some datasources.
    
    ## How was this patch tested?
    
    Unit test


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/gengliangwang/spark DSV2_Writer

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/20386.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #20386
    
----
commit 11711a43eb4a327af30aa3354cf81366616739e4
Author: Wang Gengliang <lt...@...>
Date:   2018-01-24T09:15:38Z

    add api 'add' in DataSourceV2Writer

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang closed the pull request at:

    https://github.com/apache/spark/pull/20386


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165124614
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    What are the exact guarantees you're looking for when calling a system "exactly-once"? I worry you're looking for something that isn't possible. In particular, I don't know of any additional guarantee that check would allow us to make.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164737290
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,33 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer in the executor side.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    +
    +  /**
    +   * Commits this writing job.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        */
    -  void commit(WriterCommitMessage[] messages);
    +  void commit();
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} /{@link #add(WriterCommitMessage)} fails.
        *
        * If this method fails (by throwing an exception), the underlying data source may require manual
        * cleanup.
    -   *
    -   * Unless the abort is triggered by the failure of commit, the given messages should have some
    --- End diff --
    
    Unless the abort is triggered by the failure of #commit, the number of commit messages added by #add should be smaller than the number of input data partitions, as there may be only a few data writers that are committed before the abort happens, or some data writers were committed but their commit messages haven't reached the driver when the abort is triggered. So this is just a "best effort" for data sources to clean up the data left by data writers.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164907397
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    --- End diff --
    
    Nit: javadoc typo.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680877
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MemorySinkV2Suite.scala ---
    @@ -41,19 +41,22 @@ class MemorySinkV2Suite extends StreamTest with BeforeAndAfter {
       test("continuous writer") {
         val sink = new MemorySinkV2
         val writer = new MemoryStreamWriter(sink, OutputMode.Append())
    -    writer.commit(0,
    -      Array(
    -        MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
    -        MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
    -        MemoryWriterCommitMessage(2, Seq(Row(6), Row(7)))
    -      ))
    +    val messages = Seq(
    +      MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))),
    +      MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))),
    +      MemoryWriterCommitMessage(2, Seq(Row(6), Row(7)))
    +    )
    +    messages.foreach(writer.add(_))
    --- End diff --
    
    nit:
    ```
    writer.add(MemoryWriterCommitMessage(0, Seq(Row(1), Row(2))))
    writer.add(MemoryWriterCommitMessage(1, Seq(Row(3), Row(4))))
    ..
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86689 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86689/testReport)** for PR 20386 at commit [`377df4b`](https://github.com/apache/spark/commit/377df4b61335c5b0ce66c683f2fe67b6aa43a976).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86801 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86801/testReport)** for PR 20386 at commit [`42dc690`](https://github.com/apache/spark/commit/42dc69004ad37a5c4a5d8c96478a875ff4baed4e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86788 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86788/testReport)** for PR 20386 at commit [`7a677fd`](https://github.com/apache/spark/commit/7a677fd63338cdfca4f1406ee9a5a7c45df42521).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/354/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165137514
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    It's true that there's no exactly-once behavior with respect to StreamWriter.commit(). "Exactly-once processing" refers to the promise that the remote sink will contain 1 and no more than 1 committed copy of each processed record.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/383/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86801/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/238/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86698 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86698/testReport)** for PR 20386 at commit [`bdd9bd1`](https://github.com/apache/spark/commit/bdd9bd1ee7d5b4db6d108137eca3c25cf650ff72).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164649253
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala ---
    @@ -148,7 +148,8 @@ private[continuous] class EpochCoordinator(
           logDebug(s"Epoch $epoch has received commits from all partitions. Committing globally.")
           // Sequencing is important here. We must commit to the writer before recording the commit
           // in the query, or we will end up dropping the commit if we restart in the middle.
    -      writer.commit(epoch, thisEpochCommits.toArray)
    +      thisEpochCommits.foreach(writer.add(_))
    --- End diff --
    
    is it possible to call `add` once the commit message arrives?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86829/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165138574
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    If that's the case, then this interface should be clear about it instead of including wording about exactly-once. For this interface, there is no exactly-once guarantee.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    CC @rdblue @zsxwing @jose-torres @sameeragarwal 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86775/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/373/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164735787
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,33 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer in the executor side.
    --- End diff --
    
    maybe remove `in the executor side.` People may mistakenly think this method is called at executor side.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164736461
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,33 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer in the executor side.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    +
    +  /**
    +   * Commits this writing job.
    --- End diff --
    
    `... When this method is called, the number of commit messages added by #add is the same as the number of input data partitions.`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164737921
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala ---
    @@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest {
         Console.withOut(captured) {
           val query = input.toDF().writeStream.format("console").start()
           try {
    -        input.addData(1, 2, 3)
    +        input.addData(1, 1, 1)
    --- End diff --
    
    It's fixable if we attach the partition id to the commit message of ConsoleSink, but is it worth?  cc @zsxwing @jose-torres 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86698 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86698/testReport)** for PR 20386 at commit [`bdd9bd1`](https://github.com/apache/spark/commit/bdd9bd1ee7d5b4db6d108137eca3c25cf650ff72).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164907626
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    --- End diff --
    
    What does this mean? It isn't clear to me what "the number of input partitions" means, or why it isn't obvious that it is equal to the number of pending `WriterCommitMessage` instances passed to add.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    There is a lesson I learned from streaming data source v1: even it's totally internal, there are people already using it and ask us to not remove the API.
    
    I think it's also true for the file-based data source. It's internal but people may still use it. Although we don't find any use case for `onTaskCommit` among built-in data sources, it may be required by external data sources.
    
    One possible use case might be, the implementation needs a 2-phase commit at the driver side. Then it can use `onTaskCommit` to finish the first phase earlier. Or maybe someone wanna collect the received commit messages so far and report statistics regularly, then he needs the `onTaskCommit`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86826/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86775 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86775/testReport)** for PR 20386 at commit [`e973187`](https://github.com/apache/spark/commit/e97318756f599cc19a21363afc27c95448233261).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Close this PR now. Resolve the problem with #20454.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165119560
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    --- End diff --
    
    +1 for separating and using another PR. Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/389/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    @gengliangwang, what is the use case supported by this? In other words, how is `onTaskCommit(taskCommit: TaskCommitMessage)` currently used that requires this change?
    
    In general, I'm more concerned with the batch side and I don't have a huge problem with this change. I do want to make sure it is in support of a valid use case. I'd also rather separate the batch and streaming committer APIs because they have so little in common.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164933166
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
        * in some circumstances.
        */
    -  void commit(long epochId, WriterCommitMessage[] messages);
    +  void commit(long epochId);
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
        *
        * If this method fails (by throwing an exception), the underlying data source may require manual
        * cleanup.
        *
    -   * Unless the abort is triggered by the failure of commit, the given messages should have some
    -   * null slots as there maybe only a few data writers that are committed before the abort
    -   * happens, or some data writers were committed but their commit messages haven't reached the
    -   * driver when the abort is triggered. So this is just a "best effort" for data sources to
    -   * clean up the data left by data writers.
    +   * Unless the abort is triggered by the failure of commit, the number of commit
    +   * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number
    +   * of input data partitions, as there may be only a few data writers that are committed
    +   * before the abort happens, or some data writers were committed but their commit messages
    +   * haven't reached the driver when the abort is triggered. So this is just a "best effort"
    --- End diff --
    
    I think there is no difference between "the message is created, but a node fails before it is sent" and "the message is in flight". Implementations need to deal with the case when a writer finishes successfully but its message is not available in `abort` anyway.
    
    `best effort` might not be a good word, do you have a better suggestion?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164909225
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
        * in some circumstances.
        */
    -  void commit(long epochId, WriterCommitMessage[] messages);
    +  void commit(long epochId);
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
        *
        * If this method fails (by throwing an exception), the underlying data source may require manual
        * cleanup.
        *
    -   * Unless the abort is triggered by the failure of commit, the given messages should have some
    -   * null slots as there maybe only a few data writers that are committed before the abort
    -   * happens, or some data writers were committed but their commit messages haven't reached the
    -   * driver when the abort is triggered. So this is just a "best effort" for data sources to
    -   * clean up the data left by data writers.
    +   * Unless the abort is triggered by the failure of commit, the number of commit
    +   * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number
    +   * of input data partitions, as there may be only a few data writers that are committed
    +   * before the abort happens, or some data writers were committed but their commit messages
    +   * haven't reached the driver when the abort is triggered. So this is just a "best effort"
    --- End diff --
    
    Commit messages in flight should be handled and aborted. Otherwise, this isn't a "best effort". Best effort means that Spark does everything that is feasible to ensure that commit messages are added before aborting, and that should include race conditions from RPC.
    
    The case where "best effort" might miss a message is if the message is created, but a node fails before it is sent to the driver.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165131292
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    For a commit interface, I expect the guarantee to be that data is committed exactly once. If commits are idempotent, data may be reprocessed, and commits may happen more than once, then that is not an exactly-once commit: that is an at-least-once commit.
    
    I'm not trying to split hairs. My point is that if there's no difference in behavior between exactly-once and at-least-once because the commit must be idempotent, then you don't actually have a exactly-once guarantee.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86826 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86826/testReport)** for PR 20386 at commit [`d198671`](https://github.com/apache/spark/commit/d198671aa6794e76f606a364b479b3143bec2c19).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/196/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164735522
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -40,16 +40,21 @@
      *   1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
      *      partitions of the input data(RDD).
      *   2. For each partition, create the data writer, and write the data of the partition with this
    - *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
    - *      exception happens during the writing, call {@link DataWriter#abort()}.
    - *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
    - *      some writers are aborted, or the job failed with an unknown reason, call
    - *      {@link #abort(WriterCommitMessage[])}.
    + *      writer. If one data writer finishes successfully, the commit message will be sent back to
    + *      the driver side and Spark will call {@link #add(WriterCommitMessage)}.
    + *      If exception happens during the writing, call {@link DataWriter#abort()}.
    + *   3. If all the data writers finish successfully, and {@link #add(WriterCommitMessage)} is
    + *      successfully called for all the commit messages, Spark will call {@link #commit()}.
    + *      If any of the data writers failed, or any of the {@link #add(WriterCommitMessage)}
    + *      calls failed, or the job failed with an unknown reason, call {@link #abort()}.
      *
      * While Spark will retry failed writing tasks, Spark won't retry failed writing jobs. Users should
      * do it manually in their Spark applications if they want to retry.
      *
    - * Please refer to the documentation of commit/abort methods for detailed specifications.
    + * All these methods are guaranteed to be called in a single thread.
    --- End diff --
    
    nit: `... in a single thread at driver side`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86826 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86826/testReport)** for PR 20386 at commit [`d198671`](https://github.com/apache/spark/commit/d198671aa6794e76f606a364b479b3143bec2c19).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/278/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86647 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86647/testReport)** for PR 20386 at commit [`b572930`](https://github.com/apache/spark/commit/b572930c235cd3819acfcd7a33f88103a2bfbf45).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86698/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164648934
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---
    @@ -63,32 +65,30 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message produced by {@link DataWriter#commit()}.
    --- End diff --
    
    nit: `..., which is collected from a successful data writer in the executor side.`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    > I assume this API is necessary . . . it sounds reasonable to provide a callback for task commit.
    
    I agree it sounds reasonable, but we shouldn't add methods to a new API blindly and without a use case. The point of a new API, at least in part, is to improve on the old one. If it is never used, then we are carrying support for something that is useless. On the other hand, if it is used we should know what it is needed for so we can design for the use case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86822 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86822/testReport)** for PR 20386 at commit [`86de2f0`](https://github.com/apache/spark/commit/86de2f0e6da1a82ea8bcb9b4b1d7a47e4ec0c7e3).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86801 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86801/testReport)** for PR 20386 at commit [`42dc690`](https://github.com/apache/spark/commit/42dc69004ad37a5c4a5d8c96478a875ff4baed4e).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86809 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86809/testReport)** for PR 20386 at commit [`42dc690`](https://github.com/apache/spark/commit/42dc69004ad37a5c4a5d8c96478a875ff4baed4e).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86647 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86647/testReport)** for PR 20386 at commit [`b572930`](https://github.com/apache/spark/commit/b572930c235cd3819acfcd7a33f88103a2bfbf45).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86829 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86829/testReport)** for PR 20386 at commit [`540ff06`](https://github.com/apache/spark/commit/540ff0631471a27af23abb7e8c034bad1ba27cbc).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/365/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86775 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86775/testReport)** for PR 20386 at commit [`e973187`](https://github.com/apache/spark/commit/e97318756f599cc19a21363afc27c95448233261).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86690/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165117779
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
        * in some circumstances.
        */
    -  void commit(long epochId, WriterCommitMessage[] messages);
    +  void commit(long epochId);
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
        *
        * If this method fails (by throwing an exception), the underlying data source may require manual
        * cleanup.
        *
    -   * Unless the abort is triggered by the failure of commit, the given messages should have some
    -   * null slots as there maybe only a few data writers that are committed before the abort
    -   * happens, or some data writers were committed but their commit messages haven't reached the
    -   * driver when the abort is triggered. So this is just a "best effort" for data sources to
    -   * clean up the data left by data writers.
    +   * Unless the abort is triggered by the failure of commit, the number of commit
    +   * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number
    +   * of input data partitions, as there may be only a few data writers that are committed
    +   * before the abort happens, or some data writers were committed but their commit messages
    +   * haven't reached the driver when the abort is triggered. So this is just a "best effort"
    --- End diff --
    
    Best effort is not just how we describe the behavior, it is a requirement of the contract. Spark should not drop commit messages because it is convenient. Spark knows what tasks succeeded and failed and which ones were authorized to commit. That's enough information to provide the best-effort guarantee.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164681157
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala ---
    @@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest {
         Console.withOut(captured) {
           val query = input.toDF().writeStream.format("console").start()
           try {
    -        input.addData(1, 2, 3)
    +        input.addData(1, 1, 1)
    --- End diff --
    
    why this change?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164909653
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    I realize this isn't part of this commit, but why would an exactly-once guarantee require idempotent commits? Processing the same data twice with an idempotent guarantee is not the same thing as exactly-once.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164648356
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---
    @@ -40,11 +40,13 @@
      *   1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
      *      partitions of the input data(RDD).
      *   2. For each partition, create the data writer, and write the data of the partition with this
    - *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
    - *      exception happens during the writing, call {@link DataWriter#abort()}.
    - *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
    + *      writer. If all the data are written successfully, call {@link DataWriter#commit()}.
    --- End diff --
    
    If one data writer finishes successfully, the commit message will be sent back to the driver side and Spark will call #add.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86690 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86690/testReport)** for PR 20386 at commit [`a434573`](https://github.com/apache/spark/commit/a434573c1dd347db6907523f97f54d1cdae2d5ca).
     * This patch **fails due to an unknown error code, -9**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86595 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86595/testReport)** for PR 20386 at commit [`11711a4`](https://github.com/apache/spark/commit/11711a43eb4a327af30aa3354cf81366616739e4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86788 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86788/testReport)** for PR 20386 at commit [`7a677fd`](https://github.com/apache/spark/commit/7a677fd63338cdfca4f1406ee9a5a7c45df42521).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164648645
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---
    @@ -40,11 +40,13 @@
      *   1. Create a writer factory by {@link #createWriterFactory()}, serialize and send it to all the
      *      partitions of the input data(RDD).
      *   2. For each partition, create the data writer, and write the data of the partition with this
    - *      writer. If all the data are written successfully, call {@link DataWriter#commit()}. If
    - *      exception happens during the writing, call {@link DataWriter#abort()}.
    - *   3. If all writers are successfully committed, call {@link #commit(WriterCommitMessage[])}. If
    + *      writer. If all the data are written successfully, call {@link DataWriter#commit()}.
    + *      On a writer being successfully committed, call {@link #add(WriterCommitMessage)} to
    + *      handle its commit message.
    + *      If exception happens during the writing, call {@link DataWriter#abort()}.
    + *   3. If all writers are successfully committed, call {@link #commit()}. If
    --- End diff --
    
    If all the data writers finish successfully, and #add is successfully called for all the commit messages, Spark will call #commit. If any of the data writers failed, or any of the #add call failed, or the job failed with an unknown reason, call #abort.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    `FileCommitProtocol.onTaskCommit` is called in `FileFormatWriter.write`, so this PR is required to migrate file-based data sources.
    
    By a quick look, it seems `FileCommitProtocol.onTaskCommit` doesn't have an implementation yet, but I don't want to change the existing API and I assume this API is necessary. BTW it sounds reasonable to provide a callback for task commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86822 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86822/testReport)** for PR 20386 at commit [`86de2f0`](https://github.com/apache/spark/commit/86de2f0e6da1a82ea8bcb9b4b1d7a47e4ec0c7e3).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165121965
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    Thanks for this explanation, I think I see what you're saying. But I think your statement that refers to "true" exactly-once gives away the fact that this does not provide exactly-once semantics.
    
    Maybe this is a question for the dev list: why the weaker version? Shouldn't this API provide a check to see whether the data was already committed?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86823 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86823/testReport)** for PR 20386 at commit [`f72c86c`](https://github.com/apache/spark/commit/f72c86ce97ef7004c0a16b6fbe390308feda7759).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/384/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    @rdblue The target is 2.3 release. Thanks for your time!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680632
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -118,14 +118,21 @@ class MemoryWriter(sink: MemorySinkV2, batchId: Long, outputMode: OutputMode)
     
       override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode)
     
    -  def commit(messages: Array[WriterCommitMessage]): Unit = {
    +  private val messages = new ArrayBuffer[WriterCommitMessage]()
    +
    +  override def add(message: WriterCommitMessage): Unit = synchronized {
    +    messages += message
    +  }
    +
    +  def commit(): Unit = synchronized {
         val newRows = messages.flatMap {
           case message: MemoryWriterCommitMessage => message.data
    -    }
    +    }.toArray
         sink.write(batchId, outputMode, newRows)
    +    messages.clear()
       }
     
    -  override def abort(messages: Array[WriterCommitMessage]): Unit = {
    +  override def abort(): Unit = {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164810169
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    +
    +  /**
    +   * Commits this writing job.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        */
    -  void commit(WriterCommitMessage[] messages);
    +  void commit();
    --- End diff --
    
    WDYT of using the same API as FileCommitProtocol, where the engine both calls add() for each message but also passes them in to commit() at the end? It seems like most writers will have to keep an array of the messages they received.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165119427
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    --- End diff --
    
    Passing the messages to commit and abort seems simpler and better to me, but that's for the batch side. And, we shouldn't move forward with this unless there's a use case.
    
    As for the docs here, what is an implementer intended to understand as a result of this? "The number of data partitions to write" is also misleading: weren't these already written and committed by tasks?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164918470
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
    --- End diff --
    
    The StreamWriter is responsible for setting up a distributed transaction to commit the data within batch both locally and to the remote system. But the StreamExecution keeps its own log of which batches have been fully completed. ("Fully completed" includes things like stateful aggregation commit and progress logging which can't reasonably participate in the StreamWriter's transaction.)
    
    So there's a scenario where Spark fails between StreamWriter commit and StreamExecution commit, in which the StreamExecution must re-execute the batch to ensure everything is in the right state. The StreamWriter is responsible for ensuring this doesn't generate duplicate data in the remote system.
    
    Note that the "true" exactly once strategy, where the StreamWriter aborts the retried batch because it was already committed before, is indeed idempotent wrt StreamWriter.commit(epochId). But there are weaker strategies which still provide equivalent semantics.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    I like this change! It adds a missing feature which is required for migrating the file-based data source(which use `FileCommitProtocol` and has a callback for task commit), and also make it possible to handle large jobs, which have a lot of tasks. Implementations can externalize the commit messages to avoid keeping too many messages in memory.
    
    LGTM, waiting feedback from others.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164805751
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala ---
    @@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest {
         Console.withOut(captured) {
           val query = input.toDF().writeStream.format("console").start()
           try {
    -        input.addData(1, 2, 3)
    +        input.addData(1, 1, 1)
    --- End diff --
    
    Makes sense, but can we set the parallelism to 1 instead? I worry that making all the elements the same is more likely to disguise a bug.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86690 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86690/testReport)** for PR 20386 at commit [`a434573`](https://github.com/apache/spark/commit/a434573c1dd347db6907523f97f54d1cdae2d5ca).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    retest this please.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86823/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/272/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164703291
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala ---
    @@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest {
         Console.withOut(captured) {
           val query = input.toDF().writeStream.format("console").start()
           try {
    -        input.addData(1, 2, 3)
    +        input.addData(1, 1, 1)
    --- End diff --
    
    The order of collected messages is not the same as input data any more.
    To make the test case working, we should either change input data to same elements, or set `spark.default.parallelism` as 1.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86647/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/271/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680081
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala ---
    @@ -54,10 +54,6 @@ case class WriteToDataSourceV2Exec(writer: DataSourceV2Writer, query: SparkPlan)
         }
     
         val rdd = query.execute()
    -    val messages = new Array[WriterCommitMessage](rdd.partitions.length)
    -
    -    logInfo(s"Start processing data source writer: $writer. " +
    -      s"The input RDD has ${messages.length} partitions.")
    --- End diff --
    
    might be good to keep this log.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164930522
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    +
    +  /**
    +   * Commits this writing job.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
        *
    -   * Note that, one partition may have multiple committed data writers because of speculative tasks.
    -   * Spark will pick the first successful one and get its commit message. Implementations should be
    -   * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
    -   * writer can commit, or have a way to clean up the data of already-committed writers.
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        */
    -  void commit(WriterCommitMessage[] messages);
    +  void commit();
    --- End diff --
    
    This is something we wanna improve at the API level. I think the implementation should be free to decide how to store the messages, in case each message is big and there are a lot of them. If this is not a problem at all, we can follow `FileCommitProtocol`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86822/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86689 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86689/testReport)** for PR 20386 at commit [`377df4b`](https://github.com/apache/spark/commit/377df4b61335c5b0ce66c683f2fe67b6aa43a976).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164908529
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
    --- End diff --
    
    In what case would an implementation not cache and commit all at once? What is the point of a commit if not to make sure all of the data shows up at the same time?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86823 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86823/testReport)** for PR 20386 at commit [`f72c86c`](https://github.com/apache/spark/commit/f72c86ce97ef7004c0a16b6fbe390308feda7759).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86809 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86809/testReport)** for PR 20386 at commit [`42dc690`](https://github.com/apache/spark/commit/42dc69004ad37a5c4a5d8c96478a875ff4baed4e).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164738120
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriterSuite.scala ---
    @@ -34,9 +33,9 @@ class ConsoleWriterSuite extends StreamTest {
         Console.withOut(captured) {
           val query = input.toDF().writeStream.format("console").start()
           try {
    -        input.addData(1, 2, 3)
    +        input.addData(1, 1, 1)
    --- End diff --
    
    Generally I think a streaming sink doesn't need to keep the data order w.r.t. the partition id.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86595 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86595/testReport)** for PR 20386 at commit [`11711a4`](https://github.com/apache/spark/commit/11711a43eb4a327af30aa3354cf81366616739e4).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164648815
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceV2Writer.java ---
    @@ -63,32 +65,30 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message produced by {@link DataWriter#commit()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    --- End diff --
    
    add some more comments to say that, implementations should probably cache the commit messages and do the final step in #commit


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r165126065
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    +   *
    +   * If this method fails (by throwing an exception), this writing job is considered to to have been
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
        *
        * To support exactly-once processing, writer implementations should ensure that this method is
        * idempotent. The execution engine may call commit() multiple times for the same epoch
        * in some circumstances.
        */
    -  void commit(long epochId, WriterCommitMessage[] messages);
    +  void commit(long epochId);
     
       /**
    -   * Aborts this writing job because some data writers are failed and keep failing when retry, or
    -   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    +   * Aborts this writing job because some data writers are failed and keep failing when retry,
    +   * or the Spark job fails with some unknown reasons,
    +   * or {@link #commit()} / {@link #add(WriterCommitMessage)} fails
        *
        * If this method fails (by throwing an exception), the underlying data source may require manual
        * cleanup.
        *
    -   * Unless the abort is triggered by the failure of commit, the given messages should have some
    -   * null slots as there maybe only a few data writers that are committed before the abort
    -   * happens, or some data writers were committed but their commit messages haven't reached the
    -   * driver when the abort is triggered. So this is just a "best effort" for data sources to
    -   * clean up the data left by data writers.
    +   * Unless the abort is triggered by the failure of commit, the number of commit
    +   * messages added by {@link #add(WriterCommitMessage)} should be smaller than the number
    +   * of input data partitions, as there may be only a few data writers that are committed
    +   * before the abort happens, or some data writers were committed but their commit messages
    +   * haven't reached the driver when the abort is triggered. So this is just a "best effort"
    --- End diff --
    
    This is a bit of a weird case for API documentation, because the external users of the API will be implementing rather than consuming the interface. We shouldn't drop messages just because we don't want to be bothered, but it's easy to fix that if we make a mistake and there's no serious problem if we miss cases we really could have handled. It's a more serious issue if people misunderstand what Spark can provide, and implement sources which assume any commit message that's been generated will be passed to abort.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164909970
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    --- End diff --
    
    This is the only method shared between the stream and batch writers. Why does the streaming interface extend this one?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86788/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/342/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680538
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ConsoleWriter.scala ---
    @@ -39,13 +41,20 @@ class ConsoleWriter(schema: StructType, options: DataSourceV2Options)
     
       def createWriterFactory(): DataWriterFactory[Row] = PackedRowWriterFactory
     
    -  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    +  private val messages = new ArrayBuffer[WriterCommitMessage]()
    +
    +  override def add(message: WriterCommitMessage): Unit = synchronized {
    +    messages += message
    +  }
    +
    +  override def commit(epochId: Long): Unit = synchronized {
         // We have to print a "Batch" label for the epoch for compatibility with the pre-data source V2
         // behavior.
    -    printRows(messages, schema, s"Batch: $epochId")
    +    printRows(messages.toArray, schema, s"Batch: $epochId")
    +    messages.clear()
       }
     
    -  def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {}
    +  def abort(epochId: Long): Unit = {}
    --- End diff --
    
    we should clear the message array in abort too.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/387/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    **[Test build #86829 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/86829/testReport)** for PR 20386 at commit [`540ff06`](https://github.com/apache/spark/commit/540ff0631471a27af23abb7e8c034bad1ba27cbc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by jose-torres <gi...@git.apache.org>.
Github user jose-torres commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164919092
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java ---
    @@ -63,32 +68,42 @@
       DataWriterFactory<Row> createWriterFactory();
     
       /**
    -   * Commits this writing job with a list of commit messages. The commit messages are collected from
    -   * successful data writers and are produced by {@link DataWriter#commit()}.
    +   * Handles a commit message which is collected from a successful data writer.
    +   *
    +   * Note that, implementations might need to cache all commit messages before calling
    +   * {@link #commit()} or {@link #abort()}.
        *
        * If this method fails (by throwing an exception), this writing job is considered to to have been
    -   * failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
    -   * is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
    +   * failed, and {@link #abort()} would be called. The state of the destination
    +   * is undefined and @{@link #abort()} may not be able to deal with it.
    +   */
    +  void add(WriterCommitMessage message);
    --- End diff --
    
    It probably shouldn't anymore. But I'd suggest dealing with that in another PR, because removing the inheritance will require splitting off some streaming parts of the execution engine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    @cloud-fan, is the intent to get this into 2.3.0? If so, I'll make time to review it today.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164680686
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala ---
    @@ -135,14 +142,21 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
     
       override def createWriterFactory: MemoryWriterFactory = MemoryWriterFactory(outputMode)
     
    -  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    +  private val messages = new ArrayBuffer[WriterCommitMessage]()
    +
    +  override def add(message: WriterCommitMessage): Unit = synchronized {
    +    messages += message
    +  }
    +
    +  override def commit(epochId: Long): Unit = synchronized {
         val newRows = messages.flatMap {
           case message: MemoryWriterCommitMessage => message.data
    -    }
    +    }.toArray
         sink.write(epochId, outputMode, newRows)
    +    messages.clear()
       }
     
    -  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {
    +  override def abort(epochId: Long): Unit = {
    --- End diff --
    
    ditto


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86809/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer....

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20386#discussion_r164932125
  
    --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/streaming/writer/StreamWriter.java ---
    @@ -32,40 +32,44 @@
     @InterfaceStability.Evolving
     public interface StreamWriter extends DataSourceWriter {
       /**
    -   * Commits this writing job for the specified epoch with a list of commit messages. The commit
    -   * messages are collected from successful data writers and are produced by
    -   * {@link DataWriter#commit()}.
    +   * Commits this writing job for the specified epoch.
        *
    -   * If this method fails (by throwing an exception), this writing job is considered to have been
    -   * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    +   * When this method is called, the number of commit messages added by
    +   * {@link #add(WriterCommitMessage)} equals to the number of input data partitions.
    --- End diff --
    
    how about `the number of data(RDD) partitions to write`?
    
    > why it isn't obvious ...
    
    Maybe we can just follow `FileCommitProtocol`, i.e. `commit` and `abort` still takes an array of messages.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [SPARK-23202][SQL] Break down DataSourceV2Writer.commit ...

Posted by gengliangwang <gi...@git.apache.org>.
Github user gengliangwang commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    @rdblue @cloud-fan @jose-torres thanks for the comments!
    I was trying to make the API compatible with `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol` possible. 
    The reason that I removed arguments in `commit` and `abort` is that these arguments seems redundant with the new API `add(WriterCommitMessage message)`.
    
    After consideration, I decide to take the suggestion from @jose-torres : create a new API for commit message call back, and remain the api `commit` and `abort` as what they were.
    
    Here is the new PR: https://github.com/apache/spark/pull/20454


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86689/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #20386: [WIP][SPARK-23202][SQL] Break down DataSourceV2Writer.co...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/20386
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/86595/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org