You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2016/11/15 23:12:33 UTC

spark git commit: [SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter

Repository: spark
Updated Branches:
  refs/heads/master 5bcb9a7ff -> 1ae4652b7


[SPARK-18440][STRUCTURED STREAMING] Pass correct query execution to FileFormatWriter

## What changes were proposed in this pull request?

SPARK-18012 refactored the file write path in FileStreamSink using FileFormatWriter which always uses the default non-streaming QueryExecution to perform the writes. This is wrong for FileStreamSink, because the streaming QueryExecution (i.e. IncrementalExecution) should be used for correctly incrementalizing aggregation. The addition of watermarks in SPARK-18124, file stream sink should logically supports aggregation + watermark + append mode. But actually it fails with
```
16:23:07.389 ERROR org.apache.spark.sql.execution.streaming.StreamExecution: Query query-0 terminated with error
java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark timestamp#7: timestamp, interval 10 seconds
+- LocalRelation [timestamp#7]

	at scala.Predef$.assert(Predef.scala:170)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
	at scala.collection.Iterator$class.foreach(Iterator.scala:893)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
	at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:74)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2.apply(QueryPlanner.scala:66)
	at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:92)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:77)
	at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:74)
```

This PR fixes it by passing the correct query execution.

## How was this patch tested?
New unit test

Author: Tathagata Das <ta...@gmail.com>

Closes #15885 from tdas/SPARK-18440.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ae4652b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ae4652b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ae4652b

Branch: refs/heads/master
Commit: 1ae4652b7e1f77a984b8459c778cb06c814192c5
Parents: 5bcb9a7
Author: Tathagata Das <ta...@gmail.com>
Authored: Tue Nov 15 15:12:30 2016 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Nov 15 15:12:30 2016 -0800

----------------------------------------------------------------------
 .../datasources/FileFormatWriter.scala          |  9 +--
 .../InsertIntoHadoopFsRelationCommand.scala     |  2 +-
 .../execution/streaming/FileStreamSink.scala    |  2 +-
 .../sql/streaming/FileStreamSinkSuite.scala     | 78 ++++++++++++++++++--
 4 files changed, 79 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1ae4652b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 4e4b0e4..d560ad5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
-import org.apache.spark.sql.execution.{SQLExecution, UnsafeKVExternalSorter}
+import org.apache.spark.sql.execution.{QueryExecution, SQLExecution, UnsafeKVExternalSorter}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.apache.spark.util.{SerializableConfiguration, Utils}
 import org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter
@@ -85,7 +85,7 @@ object FileFormatWriter extends Logging {
    */
   def write(
       sparkSession: SparkSession,
-      plan: LogicalPlan,
+      queryExecution: QueryExecution,
       fileFormat: FileFormat,
       committer: FileCommitProtocol,
       outputSpec: OutputSpec,
@@ -101,8 +101,7 @@ object FileFormatWriter extends Logging {
     FileOutputFormat.setOutputPath(job, new Path(outputSpec.outputPath))
 
     val partitionSet = AttributeSet(partitionColumns)
-    val dataColumns = plan.output.filterNot(partitionSet.contains)
-    val queryExecution = Dataset.ofRows(sparkSession, plan).queryExecution
+    val dataColumns = queryExecution.logical.output.filterNot(partitionSet.contains)
 
     // Note: prepareWrite has side effect. It sets "job".
     val outputWriterFactory =
@@ -112,7 +111,7 @@ object FileFormatWriter extends Logging {
       uuid = UUID.randomUUID().toString,
       serializableHadoopConf = new SerializableConfiguration(job.getConfiguration),
       outputWriterFactory = outputWriterFactory,
-      allColumns = plan.output,
+      allColumns = queryExecution.logical.output,
       partitionColumns = partitionColumns,
       nonPartitionColumns = dataColumns,
       bucketSpec = bucketSpec,

http://git-wip-us.apache.org/repos/asf/spark/blob/1ae4652b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index 28975e1..a9bde90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -100,7 +100,7 @@ case class InsertIntoHadoopFsRelationCommand(
 
       FileFormatWriter.write(
         sparkSession = sparkSession,
-        plan = query,
+        queryExecution = Dataset.ofRows(sparkSession, query).queryExecution,
         fileFormat = fileFormat,
         committer = committer,
         outputSpec = FileFormatWriter.OutputSpec(

http://git-wip-us.apache.org/repos/asf/spark/blob/1ae4652b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
index f1c5f9a..0dbe2a7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSink.scala
@@ -77,7 +77,7 @@ class FileStreamSink(
 
       FileFormatWriter.write(
         sparkSession = sparkSession,
-        plan = data.logicalPlan,
+        queryExecution = data.queryExecution,
         fileFormat = fileFormat,
         committer = committer,
         outputSpec = FileFormatWriter.OutputSpec(path, Map.empty),

http://git-wip-us.apache.org/repos/asf/spark/blob/1ae4652b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index fa97d92..09613ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -21,13 +21,14 @@ import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.streaming.{MemoryStream, MetadataLogFileIndex}
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
 import org.apache.spark.util.Utils
 
 class FileStreamSinkSuite extends StreamTest {
   import testImplicits._
 
-  test("FileStreamSink - unpartitioned writing and batch reading") {
+  test("unpartitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val df = inputData.toDF()
 
@@ -59,7 +60,7 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
-  test("FileStreamSink - partitioned writing and batch reading") {
+  test("partitioned writing and batch reading") {
     val inputData = MemoryStream[Int]
     val ds = inputData.toDS()
 
@@ -142,16 +143,83 @@ class FileStreamSinkSuite extends StreamTest {
     }
   }
 
-  test("FileStreamSink - parquet") {
+  // This tests whether FileStreamSink works with aggregations. Specifically, it tests
+  // whether the the correct streaming QueryExecution (i.e. IncrementalExecution) is used to
+  // to execute the trigger for writing data to file sink. See SPARK-18440 for more details.
+  test("writing with aggregation") {
+
+    // Since FileStreamSink currently only supports append mode, we will test FileStreamSink
+    // with aggregations using event time windows and watermark, which allows
+    // aggregation + append mode.
+    val inputData = MemoryStream[Long]
+    val inputDF = inputData.toDF.toDF("time")
+    val outputDf = inputDF
+      .selectExpr("CAST(time AS timestamp) AS timestamp")
+      .withWatermark("timestamp", "10 seconds")
+      .groupBy(window($"timestamp", "5 seconds"))
+      .count()
+      .select("window.start", "window.end", "count")
+
+    val outputDir = Utils.createTempDir(namePrefix = "stream.output").getCanonicalPath
+    val checkpointDir = Utils.createTempDir(namePrefix = "stream.checkpoint").getCanonicalPath
+
+    var query: StreamingQuery = null
+
+    try {
+      query =
+        outputDf.writeStream
+          .option("checkpointLocation", checkpointDir)
+          .format("parquet")
+          .start(outputDir)
+
+
+      def addTimestamp(timestampInSecs: Int*): Unit = {
+        inputData.addData(timestampInSecs.map(_ * 1L): _*)
+        failAfter(streamingTimeout) {
+          query.processAllAvailable()
+        }
+      }
+
+      def check(expectedResult: ((Long, Long), Long)*): Unit = {
+        val outputDf = spark.read.parquet(outputDir)
+          .selectExpr(
+            "CAST(start as BIGINT) AS start",
+            "CAST(end as BIGINT) AS end",
+            "count")
+        checkDataset(
+          outputDf.as[(Long, Long, Long)],
+          expectedResult.map(x => (x._1._1, x._1._2, x._2)): _*)
+      }
+
+      addTimestamp(100) // watermark = None before this, watermark = 100 - 10 = 90 after this
+      check() // nothing emitted yet
+
+      addTimestamp(104, 123) // watermark = 90 before this, watermark = 123 - 10 = 113 after this
+      check() // nothing emitted yet
+
+      addTimestamp(140) // wm = 113 before this, emit results on 100-105, wm = 130 after this
+      check((100L, 105L) -> 2L)
+
+      addTimestamp(150) // wm = 130s before this, emit results on 120-125, wm = 150 after this
+      check((100L, 105L) -> 2L, (120L, 125L) -> 1L)
+
+    } finally {
+      if (query != null) {
+        query.stop()
+      }
+    }
+  }
+
+  test("parquet") {
     testFormat(None) // should not throw error as default format parquet when not specified
     testFormat(Some("parquet"))
   }
 
-  test("FileStreamSink - text") {
+  test("text") {
     testFormat(Some("text"))
   }
 
-  test("FileStreamSink - json") {
+  test("json") {
     testFormat(Some("json"))
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org