You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/03/05 21:23:04 UTC
spark git commit: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.
Repository: spark
Updated Branches:
refs/heads/master ba622f45c -> b0f422c38
[SPARK-23559][SS] Add epoch ID to DataWriterFactory.
## What changes were proposed in this pull request?
Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.
I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.
## How was this patch tested?
existing unit tests
Author: Jose Torres <jo...@databricks.com>
Closes #20710 from jose-torres/api2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f422c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f422c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f422c3
Branch: refs/heads/master
Commit: b0f422c3861a5a3831e481b8ffac08f6fa085d00
Parents: ba622f4
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Mar 5 13:23:01 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Mar 5 13:23:01 2018 -0800
----------------------------------------------------------------------
.../spark/sql/kafka010/KafkaStreamWriter.scala | 5 +++-
.../spark/sql/sources/v2/writer/DataWriter.java | 12 ++++++----
.../sources/v2/writer/DataWriterFactory.java | 5 +++-
.../v2/writer/streaming/StreamWriter.java | 19 +++++++--------
.../datasources/v2/WriteToDataSourceV2.scala | 25 +++++++++++++-------
.../streaming/MicroBatchExecution.scala | 7 ++++++
.../sources/PackedRowWriterFactory.scala | 5 +++-
.../execution/streaming/sources/memoryV2.scala | 5 +++-
.../sources/v2/SimpleWritableDataSource.scala | 10 ++++++--
9 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index 9307bfc..ae5b5c5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
topic: Option[String], producerParams: Map[String, String], schema: StructType)
extends DataWriterFactory[InternalRow] {
- override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[InternalRow] = {
new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 53941a8..39bf458 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
import org.apache.spark.annotation.InterfaceStability;
/**
- * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is
+ * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is
* responsible for writing data for an input RDD partition.
*
* One Spark task has one exclusive data writer, so there is no thread-safe concern.
@@ -31,13 +31,17 @@ import org.apache.spark.annotation.InterfaceStability;
* the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
* not be processed. If all records are successfully written, {@link #commit()} is called.
*
+ * Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
+ * is over and Spark will not use it again.
+ *
* If this data writer succeeds(all records are successfully written and {@link #commit()}
* succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
* {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
* writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
- * exception will be sent to the driver side, and Spark will retry this writing task for some times,
- * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
- * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
+ * exception will be sent to the driver side, and Spark may retry this writing task a few times.
+ * In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a
+ * different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])}
+ * when the configured number of retries is exhausted.
*
* Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
* takes too long to finish. Different from retried tasks, which are launched one by one after the
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index ea95442..c2c2ab7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -48,6 +48,9 @@ public interface DataWriterFactory<T> extends Serializable {
* same task id but different attempt number, which means there are multiple
* tasks with the same task id running at the same time. Implementations can
* use this attempt number to distinguish writers of different task attempts.
+ * @param epochId A monotonically increasing id for streaming queries that are split in to
+ * discrete periods of execution. For non-streaming queries,
+ * this ID will always be 0.
*/
- DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
+ DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
index 4913341..a316b2a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
@@ -23,11 +23,10 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
/**
- * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
- * aborts relative to an epoch ID determined by the execution engine.
+ * A {@link DataSourceWriter} for use with structured streaming.
*
- * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
- * and so must reset any internal state after a successful commit.
+ * Streaming queries are divided into intervals of data called epochs, with a monotonically
+ * increasing numeric ID. This writer handles commits and aborts for each successive epoch.
*/
@InterfaceStability.Evolving
public interface StreamWriter extends DataSourceWriter {
@@ -39,21 +38,21 @@ public interface StreamWriter extends DataSourceWriter {
* If this method fails (by throwing an exception), this writing job is considered to have been
* failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
*
- * To support exactly-once processing, writer implementations should ensure that this method is
- * idempotent. The execution engine may call commit() multiple times for the same epoch
- * in some circumstances.
+ * The execution engine may call commit() multiple times for the same epoch in some circumstances.
+ * To support exactly-once data semantics, implementations must ensure that multiple commits for
+ * the same epoch are idempotent.
*/
void commit(long epochId, WriterCommitMessage[] messages);
/**
- * Aborts this writing job because some data writers are failed and keep failing when retry, or
+ * Aborts this writing job because some data writers are failed and keep failing when retried, or
* the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
*
* If this method fails (by throwing an exception), the underlying data source may require manual
* cleanup.
*
- * Unless the abort is triggered by the failure of commit, the given messages should have some
- * null slots as there maybe only a few data writers that are committed before the abort
+ * Unless the abort is triggered by the failure of commit, the given messages will have some
+ * null slots, as there may be only a few data writers that were committed before the abort
* happens, or some data writers were committed but their commit messages haven't reached the
* driver when the abort is triggered. So this is just a "best effort" for data sources to
* clean up the data left by data writers.
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 41cdfc8..e80b44c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -132,7 +132,8 @@ object DataWritingSparkTask extends Logging {
val stageId = context.stageId()
val partId = context.partitionId()
val attemptId = context.attemptNumber()
- val dataWriter = writeTask.createDataWriter(partId, attemptId)
+ val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
+ val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
@@ -172,7 +173,6 @@ object DataWritingSparkTask extends Logging {
writeTask: DataWriterFactory[InternalRow],
context: TaskContext,
iter: Iterator[InternalRow]): WriterCommitMessage = {
- val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
val epochCoordinator = EpochCoordinatorRef.get(
context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
SparkEnv.get)
@@ -180,10 +180,15 @@ object DataWritingSparkTask extends Logging {
var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
do {
+ var dataWriter: DataWriter[InternalRow] = null
// write the data and commit this writer.
Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
try {
- iter.foreach(dataWriter.write)
+ dataWriter = writeTask.createDataWriter(
+ context.partitionId(), context.attemptNumber(), currentEpoch)
+ while (iter.hasNext) {
+ dataWriter.write(iter.next())
+ }
logInfo(s"Writer for partition ${context.partitionId()} is committing.")
val msg = dataWriter.commit()
logInfo(s"Writer for partition ${context.partitionId()} committed.")
@@ -196,9 +201,10 @@ object DataWritingSparkTask extends Logging {
// Continuous shutdown always involves an interrupt. Just finish the task.
}
})(catchBlock = {
- // If there is an error, abort this writer
+ // If there is an error, abort this writer. We enter this callback in the middle of
+ // rethrowing an exception, so runContinuous will stop executing at this point.
logError(s"Writer for partition ${context.partitionId()} is aborting.")
- dataWriter.abort()
+ if (dataWriter != null) dataWriter.abort()
logError(s"Writer for partition ${context.partitionId()} aborted.")
})
} while (!context.isInterrupted())
@@ -211,9 +217,12 @@ class InternalRowDataWriterFactory(
rowWriterFactory: DataWriterFactory[Row],
schema: StructType) extends DataWriterFactory[InternalRow] {
- override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[InternalRow] = {
new InternalRowDataWriter(
- rowWriterFactory.createDataWriter(partitionId, attemptNumber),
+ rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId),
RowEncoder.apply(schema).resolveAndBind())
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6bd0397..ff4be9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -469,6 +469,9 @@ class MicroBatchExecution(
case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
}
+ sparkSessionToRunBatch.sparkContext.setLocalProperty(
+ MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)
+
reportTimeTaken("queryPlanning") {
lastExecution = new IncrementalExecution(
sparkSessionToRunBatch,
@@ -518,3 +521,7 @@ class MicroBatchExecution(
Optional.ofNullable(scalaOption.orNull)
}
}
+
+object MicroBatchExecution {
+ val BATCH_ID_KEY = "streaming.sql.batchId"
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index 248295e..e07355a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -31,7 +31,10 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat
* for production-quality sinks. It's intended for use in tests.
*/
case object PackedRowWriterFactory extends DataWriterFactory[Row] {
- def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[Row] = {
new PackedRowDataWriter()
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index f960208..5f58246 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -147,7 +147,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
}
case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] {
- def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[Row] = {
new MemoryDataWriter(partitionId, outputMode)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 36dd2a3..a5007fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -207,7 +207,10 @@ private[v2] object SimpleCounter {
class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[Row] {
- override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[Row] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
@@ -240,7 +243,10 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
extends DataWriterFactory[InternalRow] {
- override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+ override def createDataWriter(
+ partitionId: Int,
+ attemptNumber: Int,
+ epochId: Long): DataWriter[InternalRow] = {
val jobPath = new Path(new Path(path, "_temporary"), jobId)
val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
val fs = filePath.getFileSystem(conf.value)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org