You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/01 12:39:27 UTC

spark git commit: [SPARK-23202][SQL] Add new API in DataSourceWriter: onDataWriterCommit

Repository: spark
Updated Branches:
  refs/heads/master 89e8d556b -> ffbca8451


[SPARK-23202][SQL] Add new API in DataSourceWriter: onDataWriterCommit

## What changes were proposed in this pull request?

The current DataSourceWriter API makes it hard to implement `onTaskCommit(taskCommit: TaskCommitMessage)` in `FileCommitProtocol`.
In general, on receiving commit message, driver can start processing messages(e.g. persist messages into files) before all the messages are collected.

The proposal to add a new API:
`add(WriterCommitMessage message)`:  Handles a commit message on receiving from a successful data writer.

This should make the whole API of DataSourceWriter compatible with `FileCommitProtocol`, and more flexible.

There was another radical attempt in #20386.  This one should be more reasonable.

## How was this patch tested?

Unit test

Author: Wang Gengliang <lt...@gmail.com>

Closes #20454 from gengliangwang/write_api.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ffbca845
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ffbca845
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ffbca845

Branch: refs/heads/master
Commit: ffbca84519011a747e0552632e88f5e4956e493d
Parents: 89e8d55
Author: Wang Gengliang <lt...@gmail.com>
Authored: Thu Feb 1 20:39:15 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Feb 1 20:39:15 2018 +0800

----------------------------------------------------------------------
 .../sql/sources/v2/writer/DataSourceWriter.java | 14 +++++++++++--
 .../datasources/v2/WriteToDataSourceV2.scala    |  5 ++++-
 .../sql/sources/v2/DataSourceV2Suite.scala      | 21 +++++++++++++++++++-
 .../sources/v2/SimpleWritableDataSource.scala   | 21 ++++++++++++++++++++
 4 files changed, 57 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ffbca845/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index 7096aec..52324b3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -63,6 +63,14 @@ public interface DataSourceWriter {
   DataWriterFactory<Row> createWriterFactory();
 
   /**
+   * Handles a commit message on receiving from a successful data writer.
+   *
+   * If this method fails (by throwing an exception), this writing job is considered to to have been
+   * failed, and {@link #abort(WriterCommitMessage[])} would be called.
+   */
+  default void onDataWriterCommit(WriterCommitMessage message) {}
+
+  /**
    * Commits this writing job with a list of commit messages. The commit messages are collected from
    * successful data writers and are produced by {@link DataWriter#commit()}.
    *
@@ -78,8 +86,10 @@ public interface DataSourceWriter {
   void commit(WriterCommitMessage[] messages);
 
   /**
-   * Aborts this writing job because some data writers are failed and keep failing when retry, or
-   * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
+   * Aborts this writing job because some data writers are failed and keep failing when retry,
+   * or the Spark job fails with some unknown reasons,
+   * or {@link #onDataWriterCommit(WriterCommitMessage)} fails,
+   * or {@link #commit(WriterCommitMessage[])} fails.
    *
    * If this method fails (by throwing an exception), the underlying data source may require manual
    * cleanup.

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbca845/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 6592bd7..eefbcf4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -80,7 +80,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
         rdd,
         runTask,
         rdd.partitions.indices,
-        (index, message: WriterCommitMessage) => messages(index) = message
+        (index, message: WriterCommitMessage) => {
+          messages(index) = message
+          writer.onDataWriterCommit(message)
+        }
       )
 
       if (!writer.isInstanceOf[StreamWriter]) {

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbca845/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
index 2f49b07..1c3ba78 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/DataSourceV2Suite.scala
@@ -21,7 +21,7 @@ import java.util.{ArrayList, List => JList}
 
 import test.org.apache.spark.sql.sources.v2._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkConf, SparkException}
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
@@ -198,6 +198,25 @@ class DataSourceV2Suite extends QueryTest with SharedSQLContext {
       }
     }
   }
+
+  test("simple counter in writer with onDataWriterCommit") {
+    Seq(classOf[SimpleWritableDataSource]).foreach { cls =>
+      withTempPath { file =>
+        val path = file.getCanonicalPath
+        assert(spark.read.format(cls.getName).option("path", path).load().collect().isEmpty)
+
+        val numPartition = 6
+        spark.range(0, 10, 1, numPartition).select('id, -'id).write.format(cls.getName)
+          .option("path", path).save()
+        checkAnswer(
+          spark.read.format(cls.getName).option("path", path).load(),
+          spark.range(10).select('id, -'id))
+
+        assert(SimpleCounter.getCounter == numPartition,
+          "method onDataWriterCommit should be called as many as the number of partitions")
+      }
+    }
+  }
 }
 
 class SimpleDataSourceV2 extends DataSourceV2 with ReadSupport {

http://git-wip-us.apache.org/repos/asf/spark/blob/ffbca845/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index a131b16..36dd2a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -66,9 +66,14 @@ class SimpleWritableDataSource extends DataSourceV2 with ReadSupport with WriteS
 
   class Writer(jobId: String, path: String, conf: Configuration) extends DataSourceWriter {
     override def createWriterFactory(): DataWriterFactory[Row] = {
+      SimpleCounter.resetCounter
       new SimpleCSVDataWriterFactory(path, jobId, new SerializableConfiguration(conf))
     }
 
+    override def onDataWriterCommit(message: WriterCommitMessage): Unit = {
+      SimpleCounter.increaseCounter
+    }
+
     override def commit(messages: Array[WriterCommitMessage]): Unit = {
       val finalPath = new Path(path)
       val jobPath = new Path(new Path(finalPath, "_temporary"), jobId)
@@ -183,6 +188,22 @@ class SimpleCSVDataReaderFactory(path: String, conf: SerializableConfiguration)
   }
 }
 
+private[v2] object SimpleCounter {
+  private var count: Int = 0
+
+  def increaseCounter: Unit = {
+    count += 1
+  }
+
+  def getCounter: Int = {
+    count
+  }
+
+  def resetCounter: Unit = {
+    count = 0
+  }
+}
+
 class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
   extends DataWriterFactory[Row] {
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org