You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/03/05 21:23:04 UTC

spark git commit: [SPARK-23559][SS] Add epoch ID to DataWriterFactory.

Repository: spark
Updated Branches:
  refs/heads/master ba622f45c -> b0f422c38


[SPARK-23559][SS] Add epoch ID to DataWriterFactory.

## What changes were proposed in this pull request?

Add an epoch ID argument to DataWriterFactory for use in streaming. As a side effect of passing in this value, DataWriter will now have a consistent lifecycle; commit() or abort() ends the lifecycle of a DataWriter instance in any execution mode.

I considered making a separate streaming interface and adding the epoch ID only to that one, but I think it requires a lot of extra work for no real gain. I think it makes sense to define epoch 0 as the one and only epoch of a non-streaming query.

## How was this patch tested?

existing unit tests

Author: Jose Torres <jo...@databricks.com>

Closes #20710 from jose-torres/api2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b0f422c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b0f422c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b0f422c3

Branch: refs/heads/master
Commit: b0f422c3861a5a3831e481b8ffac08f6fa085d00
Parents: ba622f4
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Mar 5 13:23:01 2018 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Mon Mar 5 13:23:01 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaStreamWriter.scala  |  5 +++-
 .../spark/sql/sources/v2/writer/DataWriter.java | 12 ++++++----
 .../sources/v2/writer/DataWriterFactory.java    |  5 +++-
 .../v2/writer/streaming/StreamWriter.java       | 19 +++++++--------
 .../datasources/v2/WriteToDataSourceV2.scala    | 25 +++++++++++++-------
 .../streaming/MicroBatchExecution.scala         |  7 ++++++
 .../sources/PackedRowWriterFactory.scala        |  5 +++-
 .../execution/streaming/sources/memoryV2.scala  |  5 +++-
 .../sources/v2/SimpleWritableDataSource.scala   | 10 ++++++--
 9 files changed, 65 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
----------------------------------------------------------------------
diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
index 9307bfc..ae5b5c5 100644
--- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
+++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala
@@ -65,7 +65,10 @@ case class KafkaStreamWriterFactory(
     topic: Option[String], producerParams: Map[String, String], schema: StructType)
   extends DataWriterFactory[InternalRow] {
 
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[InternalRow] = {
     new KafkaStreamDataWriter(topic, producerParams, schema.toAttributes)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
index 53941a8..39bf458 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriter.java
@@ -22,7 +22,7 @@ import java.io.IOException;
 import org.apache.spark.annotation.InterfaceStability;
 
 /**
- * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int)} and is
+ * A data writer returned by {@link DataWriterFactory#createDataWriter(int, int, long)} and is
  * responsible for writing data for an input RDD partition.
  *
  * One Spark task has one exclusive data writer, so there is no thread-safe concern.
@@ -31,13 +31,17 @@ import org.apache.spark.annotation.InterfaceStability;
  * the {@link #write(Object)}, {@link #abort()} is called afterwards and the remaining records will
  * not be processed. If all records are successfully written, {@link #commit()} is called.
  *
+ * Once a data writer returns successfully from {@link #commit()} or {@link #abort()}, its lifecycle
+ * is over and Spark will not use it again.
+ *
  * If this data writer succeeds(all records are successfully written and {@link #commit()}
  * succeeds), a {@link WriterCommitMessage} will be sent to the driver side and pass to
  * {@link DataSourceWriter#commit(WriterCommitMessage[])} with commit messages from other data
  * writers. If this data writer fails(one record fails to write or {@link #commit()} fails), an
- * exception will be sent to the driver side, and Spark will retry this writing task for some times,
- * each time {@link DataWriterFactory#createDataWriter(int, int)} gets a different `attemptNumber`,
- * and finally call {@link DataSourceWriter#abort(WriterCommitMessage[])} if all retry fail.
+ * exception will be sent to the driver side, and Spark may retry this writing task a few times.
+ * In each retry, {@link DataWriterFactory#createDataWriter(int, int, long)} will receive a
+ * different `attemptNumber`. Spark will call {@link DataSourceWriter#abort(WriterCommitMessage[])}
+ * when the configured number of retries is exhausted.
  *
  * Besides the retry mechanism, Spark may launch speculative tasks if the existing writing task
  * takes too long to finish. Different from retried tasks, which are launched one by one after the

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
index ea95442..c2c2ab7 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java
@@ -48,6 +48,9 @@ public interface DataWriterFactory<T> extends Serializable {
    *                      same task id but different attempt number, which means there are multiple
    *                      tasks with the same task id running at the same time. Implementations can
    *                      use this attempt number to distinguish writers of different task attempts.
+   * @param epochId A monotonically increasing id for streaming queries that are split in to
+   *                discrete periods of execution. For non-streaming queries,
+   *                this ID will always be 0.
    */
-  DataWriter<T> createDataWriter(int partitionId, int attemptNumber);
+  DataWriter<T> createDataWriter(int partitionId, int attemptNumber, long epochId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
index 4913341..a316b2a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/streaming/StreamWriter.java
@@ -23,11 +23,10 @@ import org.apache.spark.sql.sources.v2.writer.DataWriter;
 import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage;
 
 /**
- * A {@link DataSourceWriter} for use with structured streaming. This writer handles commits and
- * aborts relative to an epoch ID determined by the execution engine.
+ * A {@link DataSourceWriter} for use with structured streaming.
  *
- * {@link DataWriter} implementations generated by a StreamWriter may be reused for multiple epochs,
- * and so must reset any internal state after a successful commit.
+ * Streaming queries are divided into intervals of data called epochs, with a monotonically
+ * increasing numeric ID. This writer handles commits and aborts for each successive epoch.
  */
 @InterfaceStability.Evolving
 public interface StreamWriter extends DataSourceWriter {
@@ -39,21 +38,21 @@ public interface StreamWriter extends DataSourceWriter {
    * If this method fails (by throwing an exception), this writing job is considered to have been
    * failed, and the execution engine will attempt to call {@link #abort(WriterCommitMessage[])}.
    *
-   * To support exactly-once processing, writer implementations should ensure that this method is
-   * idempotent. The execution engine may call commit() multiple times for the same epoch
-   * in some circumstances.
+   * The execution engine may call commit() multiple times for the same epoch in some circumstances.
+   * To support exactly-once data semantics, implementations must ensure that multiple commits for
+   * the same epoch are idempotent.
    */
   void commit(long epochId, WriterCommitMessage[] messages);
 
   /**
-   * Aborts this writing job because some data writers are failed and keep failing when retry, or
+   * Aborts this writing job because some data writers are failed and keep failing when retried, or
    * the Spark job fails with some unknown reasons, or {@link #commit(WriterCommitMessage[])} fails.
    *
    * If this method fails (by throwing an exception), the underlying data source may require manual
    * cleanup.
    *
-   * Unless the abort is triggered by the failure of commit, the given messages should have some
-   * null slots as there maybe only a few data writers that are committed before the abort
+   * Unless the abort is triggered by the failure of commit, the given messages will have some
+   * null slots, as there may be only a few data writers that were committed before the abort
    * happens, or some data writers were committed but their commit messages haven't reached the
    * driver when the abort is triggered. So this is just a "best effort" for data sources to
    * clean up the data left by data writers.

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index 41cdfc8..e80b44c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.{ExpressionEncoder, RowEncoder}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.{MicroBatchExecution, StreamExecution}
 import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
 import org.apache.spark.sql.sources.v2.writer._
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
@@ -132,7 +132,8 @@ object DataWritingSparkTask extends Logging {
     val stageId = context.stageId()
     val partId = context.partitionId()
     val attemptId = context.attemptNumber()
-    val dataWriter = writeTask.createDataWriter(partId, attemptId)
+    val epochId = Option(context.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY)).getOrElse("0")
+    val dataWriter = writeTask.createDataWriter(partId, attemptId, epochId.toLong)
 
     // write the data and commit this writer.
     Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
@@ -172,7 +173,6 @@ object DataWritingSparkTask extends Logging {
       writeTask: DataWriterFactory[InternalRow],
       context: TaskContext,
       iter: Iterator[InternalRow]): WriterCommitMessage = {
-    val dataWriter = writeTask.createDataWriter(context.partitionId(), context.attemptNumber())
     val epochCoordinator = EpochCoordinatorRef.get(
       context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
       SparkEnv.get)
@@ -180,10 +180,15 @@ object DataWritingSparkTask extends Logging {
     var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
 
     do {
+      var dataWriter: DataWriter[InternalRow] = null
       // write the data and commit this writer.
       Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
         try {
-          iter.foreach(dataWriter.write)
+          dataWriter = writeTask.createDataWriter(
+            context.partitionId(), context.attemptNumber(), currentEpoch)
+          while (iter.hasNext) {
+            dataWriter.write(iter.next())
+          }
           logInfo(s"Writer for partition ${context.partitionId()} is committing.")
           val msg = dataWriter.commit()
           logInfo(s"Writer for partition ${context.partitionId()} committed.")
@@ -196,9 +201,10 @@ object DataWritingSparkTask extends Logging {
             // Continuous shutdown always involves an interrupt. Just finish the task.
         }
       })(catchBlock = {
-        // If there is an error, abort this writer
+        // If there is an error, abort this writer. We enter this callback in the middle of
+        // rethrowing an exception, so runContinuous will stop executing at this point.
         logError(s"Writer for partition ${context.partitionId()} is aborting.")
-        dataWriter.abort()
+        if (dataWriter != null) dataWriter.abort()
         logError(s"Writer for partition ${context.partitionId()} aborted.")
       })
     } while (!context.isInterrupted())
@@ -211,9 +217,12 @@ class InternalRowDataWriterFactory(
     rowWriterFactory: DataWriterFactory[Row],
     schema: StructType) extends DataWriterFactory[InternalRow] {
 
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[InternalRow] = {
     new InternalRowDataWriter(
-      rowWriterFactory.createDataWriter(partitionId, attemptNumber),
+      rowWriterFactory.createDataWriter(partitionId, attemptNumber, epochId),
       RowEncoder.apply(schema).resolveAndBind())
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 6bd0397..ff4be9c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -469,6 +469,9 @@ class MicroBatchExecution(
       case _ => throw new IllegalArgumentException(s"unknown sink type for $sink")
     }
 
+    sparkSessionToRunBatch.sparkContext.setLocalProperty(
+      MicroBatchExecution.BATCH_ID_KEY, currentBatchId.toString)
+
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSessionToRunBatch,
@@ -518,3 +521,7 @@ class MicroBatchExecution(
     Optional.ofNullable(scalaOption.orNull)
   }
 }
+
+object MicroBatchExecution {
+  val BATCH_ID_KEY = "streaming.sql.batchId"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
index 248295e..e07355a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/PackedRowWriterFactory.scala
@@ -31,7 +31,10 @@ import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriter, Dat
  * for production-quality sinks. It's intended for use in tests.
  */
 case object PackedRowWriterFactory extends DataWriterFactory[Row] {
-  def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[Row] = {
     new PackedRowDataWriter()
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
index f960208..5f58246 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala
@@ -147,7 +147,10 @@ class MemoryStreamWriter(val sink: MemorySinkV2, outputMode: OutputMode)
 }
 
 case class MemoryWriterFactory(outputMode: OutputMode) extends DataWriterFactory[Row] {
-  def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[Row] = {
     new MemoryDataWriter(partitionId, outputMode)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b0f422c3/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
index 36dd2a3..a5007fa 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/SimpleWritableDataSource.scala
@@ -207,7 +207,10 @@ private[v2] object SimpleCounter {
 class SimpleCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
   extends DataWriterFactory[Row] {
 
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[Row] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[Row] = {
     val jobPath = new Path(new Path(path, "_temporary"), jobId)
     val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
     val fs = filePath.getFileSystem(conf.value)
@@ -240,7 +243,10 @@ class SimpleCSVDataWriter(fs: FileSystem, file: Path) extends DataWriter[Row] {
 class InternalRowCSVDataWriterFactory(path: String, jobId: String, conf: SerializableConfiguration)
   extends DataWriterFactory[InternalRow] {
 
-  override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = {
+  override def createDataWriter(
+      partitionId: Int,
+      attemptNumber: Int,
+      epochId: Long): DataWriter[InternalRow] = {
     val jobPath = new Path(new Path(path, "_temporary"), jobId)
     val filePath = new Path(jobPath, s"$jobId-$partitionId-$attemptNumber")
     val fs = filePath.getFileSystem(conf.value)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org