You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2018/02/13 03:40:40 UTC
spark git commit: [SPARK-23323][SQL] Support commit coordinator for
DataSourceV2 writes
Repository: spark
Updated Branches:
refs/heads/master 4104b68e9 -> c1bcef876
[SPARK-23323][SQL] Support commit coordinator for DataSourceV2 writes
## What changes were proposed in this pull request?
DataSourceV2 batch writes should use the output commit coordinator if it is required by the data source. This adds a new method, `DataWriterFactory#useCommitCoordinator`, that determines whether the coordinator will be used. If the write factory returns true, `WriteToDataSourceV2` will use the coordinator for batch writes.
## How was this patch tested?
This relies on existing write tests, which now use the commit coordinator.
Author: Ryan Blue <bl...@apache.org>
Closes #20490 from rdblue/SPARK-23323-add-commit-coordinator.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1bcef87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1bcef87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1bcef87
Branch: refs/heads/master
Commit: c1bcef876c1415e39e624cfbca9c9bdeae24cbb9
Parents: 4104b68
Author: Ryan Blue <bl...@apache.org>
Authored: Tue Feb 13 11:40:34 2018 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Tue Feb 13 11:40:34 2018 +0800
----------------------------------------------------------------------
.../sql/sources/v2/writer/DataSourceWriter.java | 19 +++++++--
.../datasources/v2/WriteToDataSourceV2.scala | 41 ++++++++++++++++----
2 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/c1bcef87/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
index e3f682b..0a0fd8d 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataSourceWriter.java
@@ -64,6 +64,16 @@ public interface DataSourceWriter {
DataWriterFactory<Row> createWriterFactory();
/**
+ * Returns whether Spark should use the commit coordinator to ensure that at most one attempt for
+ * each task commits.
+ *
+ * @return true if commit coordinator should be used, false otherwise.
+ */
+ default boolean useCommitCoordinator() {
+ return true;
+ }
+
+ /**
* Handles a commit message on receiving from a successful data writer.
*
* If this method fails (by throwing an exception), this writing job is considered to to have been
@@ -79,10 +89,11 @@ public interface DataSourceWriter {
* failed, and {@link #abort(WriterCommitMessage[])} would be called. The state of the destination
* is undefined and @{@link #abort(WriterCommitMessage[])} may not be able to deal with it.
*
- * Note that, one partition may have multiple committed data writers because of speculative tasks.
- * Spark will pick the first successful one and get its commit message. Implementations should be
- * aware of this and handle it correctly, e.g., have a coordinator to make sure only one data
- * writer can commit, or have a way to clean up the data of already-committed writers.
+ * Note that speculative execution may cause multiple tasks to run for a partition. By default,
+ * Spark uses the commit coordinator to allow at most one attempt to commit. Implementations can
+ * disable this behavior by overriding {@link #useCommitCoordinator()}. If disabled, multiple
+ * attempts may have committed successfully and one successful commit message per task will be
+ * passed to this commit method. The remaining commit messages are ignored by Spark.
*/
void commit(WriterCommitMessage[] messages);
http://git-wip-us.apache.org/repos/asf/spark/blob/c1bcef87/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index eefbcf4..535e796 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.datasources.v2
import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.executor.CommitDeniedException
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
@@ -53,6 +54,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
}
+ val useCommitCoordinator = writer.useCommitCoordinator
val rdd = query.execute()
val messages = new Array[WriterCommitMessage](rdd.partitions.length)
@@ -73,7 +75,7 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
DataWritingSparkTask.runContinuous(writeTask, context, iter)
case _ =>
(context: TaskContext, iter: Iterator[InternalRow]) =>
- DataWritingSparkTask.run(writeTask, context, iter)
+ DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator)
}
sparkContext.runJob(
@@ -116,21 +118,44 @@ object DataWritingSparkTask extends Logging {
def run(
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
- iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
+ iter: Iterator[InternalRow],
+ useCommitCoordinator: Boolean): WriterCommitMessage = {
+ val stageId = context.stageId()
+ val partId = context.partitionId()
+ val attemptId = context.attemptNumber()
+ val dataWriter = writeTask.createDataWriter(partId, attemptId)
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
iter.foreach(dataWriter.write)
- logInfo(s"Writer for partition ${context.partitionId()} is committing.")
- val msg = dataWriter.commit()
- logInfo(s"Writer for partition ${context.partitionId()} committed.")
+
+ val msg = if (useCommitCoordinator) {
+ val coordinator = SparkEnv.get.outputCommitCoordinator
+ val commitAuthorized = coordinator.canCommit(context.stageId(), partId, attemptId)
+ if (commitAuthorized) {
+ logInfo(s"Writer for stage $stageId, task $partId.$attemptId is authorized to commit.")
+ dataWriter.commit()
+ } else {
+ val message = s"Stage $stageId, task $partId.$attemptId: driver did not authorize commit"
+ logInfo(message)
+ // throwing CommitDeniedException will trigger the catch block for abort
+ throw new CommitDeniedException(message, stageId, partId, attemptId)
+ }
+
+ } else {
+ logInfo(s"Writer for partition ${context.partitionId()} is committing.")
+ dataWriter.commit()
+ }
+
+ logInfo(s"Writer for stage $stageId, task $partId.$attemptId committed.")
+
msg
+
})(catchBlock = {
// If there is an error, abort this writer
- logError(s"Writer for partition ${context.partitionId()} is aborting.")
+ logError(s"Writer for stage $stageId, task $partId.$attemptId is aborting.")
dataWriter.abort()
- logError(s"Writer for partition ${context.partitionId()} aborted.")
+ logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.")
})
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org