You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/04/25 00:06:08 UTC

spark git commit: [SPARK-24038][SS] Refactor continuous writing to its own class

Repository: spark
Updated Branches:
  refs/heads/master 7b1e6523a -> d6c26d1c9


[SPARK-24038][SS] Refactor continuous writing to its own class

## What changes were proposed in this pull request?

Refactor continuous writing to its own class.

See WIP https://github.com/jose-torres/spark/pull/13 for the overall direction this is going, but I think this PR is very isolated and necessary anyway.

## How was this patch tested?

existing unit tests - refactoring only

Author: Jose Torres <to...@gmail.com>

Closes #21116 from jose-torres/SPARK-24038.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6c26d1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6c26d1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6c26d1c

Branch: refs/heads/master
Commit: d6c26d1c9a8f747a3e0d281a27ea9eb4d92102e5
Parents: 7b1e652
Author: Jose Torres <to...@gmail.com>
Authored: Tue Apr 24 17:06:03 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Apr 24 17:06:03 2018 -0700

----------------------------------------------------------------------
 .../datasources/v2/DataSourceV2Strategy.scala   |   4 +
 .../datasources/v2/WriteToDataSourceV2.scala    |  74 +----------
 .../continuous/ContinuousExecution.scala        |   2 +-
 .../WriteToContinuousDataSource.scala           |  31 +++++
 .../WriteToContinuousDataSourceExec.scala       | 124 +++++++++++++++++++
 5 files changed, 165 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 1ac9572..c2a3144 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.datasources.v2
 import org.apache.spark.sql.Strategy
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.streaming.continuous.{WriteToContinuousDataSource, WriteToContinuousDataSourceExec}
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
@@ -32,6 +33,9 @@ object DataSourceV2Strategy extends Strategy {
     case WriteToDataSourceV2(writer, query) =>
       WriteToDataSourceV2Exec(writer, planLater(query)) :: Nil
 
+    case WriteToContinuousDataSource(writer, query) =>
+      WriteToContinuousDataSourceExec(writer, planLater(query)) :: Nil
+
     case _ => Nil
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
index e80b44c..ea283ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/WriteToDataSourceV2.scala
@@ -65,25 +65,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
       s"The input RDD has ${messages.length} partitions.")
 
     try {
-      val runTask = writer match {
-        // This case means that we're doing continuous processing. In microbatch streaming, the
-        // StreamWriter is wrapped in a MicroBatchWriter, which is executed as a normal batch.
-        case w: StreamWriter =>
-          EpochCoordinatorRef.get(
-            sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-            sparkContext.env)
-            .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
-
-          (context: TaskContext, iter: Iterator[InternalRow]) =>
-            DataWritingSparkTask.runContinuous(writeTask, context, iter)
-        case _ =>
-          (context: TaskContext, iter: Iterator[InternalRow]) =>
-            DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator)
-      }
-
       sparkContext.runJob(
         rdd,
-        runTask,
+        (context: TaskContext, iter: Iterator[InternalRow]) =>
+          DataWritingSparkTask.run(writeTask, context, iter, useCommitCoordinator),
         rdd.partitions.indices,
         (index, message: WriterCommitMessage) => {
           messages(index) = message
@@ -91,14 +76,10 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
         }
       )
 
-      if (!writer.isInstanceOf[StreamWriter]) {
-        logInfo(s"Data source writer $writer is committing.")
-        writer.commit(messages)
-        logInfo(s"Data source writer $writer committed.")
-      }
+      logInfo(s"Data source writer $writer is committing.")
+      writer.commit(messages)
+      logInfo(s"Data source writer $writer committed.")
     } catch {
-      case _: InterruptedException if writer.isInstanceOf[StreamWriter] =>
-        // Interruption is how continuous queries are ended, so accept and ignore the exception.
       case cause: Throwable =>
         logError(s"Data source writer $writer is aborting.")
         try {
@@ -111,8 +92,6 @@ case class WriteToDataSourceV2Exec(writer: DataSourceWriter, query: SparkPlan) e
         }
         logError(s"Data source writer $writer aborted.")
         cause match {
-          // Do not wrap interruption exceptions that will be handled by streaming specially.
-          case _ if StreamExecution.isInterruptionException(cause) => throw cause
           // Only wrap non fatal exceptions.
           case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
           case _ => throw cause
@@ -168,49 +147,6 @@ object DataWritingSparkTask extends Logging {
       logError(s"Writer for stage $stageId, task $partId.$attemptId aborted.")
     })
   }
-
-  def runContinuous(
-      writeTask: DataWriterFactory[InternalRow],
-      context: TaskContext,
-      iter: Iterator[InternalRow]): WriterCommitMessage = {
-    val epochCoordinator = EpochCoordinatorRef.get(
-      context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
-      SparkEnv.get)
-    val currentMsg: WriterCommitMessage = null
-    var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
-
-    do {
-      var dataWriter: DataWriter[InternalRow] = null
-      // write the data and commit this writer.
-      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
-        try {
-          dataWriter = writeTask.createDataWriter(
-            context.partitionId(), context.attemptNumber(), currentEpoch)
-          while (iter.hasNext) {
-            dataWriter.write(iter.next())
-          }
-          logInfo(s"Writer for partition ${context.partitionId()} is committing.")
-          val msg = dataWriter.commit()
-          logInfo(s"Writer for partition ${context.partitionId()} committed.")
-          epochCoordinator.send(
-            CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
-          )
-          currentEpoch += 1
-        } catch {
-          case _: InterruptedException =>
-            // Continuous shutdown always involves an interrupt. Just finish the task.
-        }
-      })(catchBlock = {
-        // If there is an error, abort this writer. We enter this callback in the middle of
-        // rethrowing an exception, so runContinuous will stop executing at this point.
-        logError(s"Writer for partition ${context.partitionId()} is aborting.")
-        if (dataWriter != null) dataWriter.abort()
-        logError(s"Writer for partition ${context.partitionId()} aborted.")
-      })
-    } while (!context.isInterrupted())
-
-    currentMsg
-  }
 }
 
 class InternalRowDataWriterFactory(

http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 951d694..f58146a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -199,7 +199,7 @@ class ContinuousExecution(
       triggerLogicalPlan.schema,
       outputMode,
       new DataSourceOptions(extraOptions.asJava))
-    val withSink = WriteToDataSourceV2(writer, triggerLogicalPlan)
+    val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan)
 
     val reader = withSink.collect {
       case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r

http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
new file mode 100644
index 0000000..943c731
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSource.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+
+/**
+ * The logical plan for writing data in a continuous stream.
+ */
+case class WriteToContinuousDataSource(
+    writer: StreamWriter, query: LogicalPlan) extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/d6c26d1c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
new file mode 100644
index 0000000..ba88ae1
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/WriteToContinuousDataSourceExec.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.continuous
+
+import scala.util.control.NonFatal
+
+import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.{DataWritingSparkTask, InternalRowDataWriterFactory}
+import org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask.{logError, logInfo}
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
+import org.apache.spark.util.Utils
+
+/**
+ * The physical plan for writing data into a continuous processing [[StreamWriter]].
+ */
+case class WriteToContinuousDataSourceExec(writer: StreamWriter, query: SparkPlan)
+    extends SparkPlan with Logging {
+  override def children: Seq[SparkPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  override protected def doExecute(): RDD[InternalRow] = {
+    val writerFactory = writer match {
+      case w: SupportsWriteInternalRow => w.createInternalRowWriterFactory()
+      case _ => new InternalRowDataWriterFactory(writer.createWriterFactory(), query.schema)
+    }
+
+    val rdd = query.execute()
+
+    logInfo(s"Start processing data source writer: $writer. " +
+      s"The input RDD has ${rdd.getNumPartitions} partitions.")
+    // Let the epoch coordinator know how many partitions the write RDD has.
+    EpochCoordinatorRef.get(
+        sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+        sparkContext.env)
+      .askSync[Unit](SetWriterPartitions(rdd.getNumPartitions))
+
+    try {
+      // Force the RDD to run so continuous processing starts; no data is actually being collected
+      // to the driver, as ContinuousWriteRDD outputs nothing.
+      sparkContext.runJob(
+        rdd,
+        (context: TaskContext, iter: Iterator[InternalRow]) =>
+          WriteToContinuousDataSourceExec.run(writerFactory, context, iter),
+        rdd.partitions.indices)
+    } catch {
+      case _: InterruptedException =>
+        // Interruption is how continuous queries are ended, so accept and ignore the exception.
+      case cause: Throwable =>
+        cause match {
+          // Do not wrap interruption exceptions that will be handled by streaming specially.
+          case _ if StreamExecution.isInterruptionException(cause) => throw cause
+          // Only wrap non fatal exceptions.
+          case NonFatal(e) => throw new SparkException("Writing job aborted.", e)
+          case _ => throw cause
+        }
+    }
+
+    sparkContext.emptyRDD
+  }
+}
+
+object WriteToContinuousDataSourceExec extends Logging {
+  def run(
+      writeTask: DataWriterFactory[InternalRow],
+      context: TaskContext,
+      iter: Iterator[InternalRow]): Unit = {
+    val epochCoordinator = EpochCoordinatorRef.get(
+      context.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
+      SparkEnv.get)
+    var currentEpoch = context.getLocalProperty(ContinuousExecution.START_EPOCH_KEY).toLong
+
+    do {
+      var dataWriter: DataWriter[InternalRow] = null
+      // write the data and commit this writer.
+      Utils.tryWithSafeFinallyAndFailureCallbacks(block = {
+        try {
+          dataWriter = writeTask.createDataWriter(
+            context.partitionId(), context.attemptNumber(), currentEpoch)
+          while (iter.hasNext) {
+            dataWriter.write(iter.next())
+          }
+          logInfo(s"Writer for partition ${context.partitionId()} is committing.")
+          val msg = dataWriter.commit()
+          logInfo(s"Writer for partition ${context.partitionId()} committed.")
+          epochCoordinator.send(
+            CommitPartitionEpoch(context.partitionId(), currentEpoch, msg)
+          )
+          currentEpoch += 1
+        } catch {
+          case _: InterruptedException =>
+          // Continuous shutdown always involves an interrupt. Just finish the task.
+        }
+      })(catchBlock = {
+        // If there is an error, abort this writer. We enter this callback in the middle of
+        // rethrowing an exception, so runContinuous will stop executing at this point.
+        logError(s"Writer for partition ${context.partitionId()} is aborting.")
+        if (dataWriter != null) dataWriter.abort()
+        logError(s"Writer for partition ${context.partitionId()} aborted.")
+      })
+    } while (!context.isInterrupted())
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org