You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/11/19 00:34:42 UTC
spark git commit: [SPARK-18497][SS] Make ForeachSink support watermark
Repository: spark
Updated Branches:
refs/heads/master 6f7ff7509 -> 2a40de408
[SPARK-18497][SS] Make ForeachSink support watermark
## What changes were proposed in this pull request?
The issue in ForeachSink is the new created DataSet still uses the old QueryExecution. When `foreachPartition` is called, `QueryExecution.toString` will be called and then fail because it doesn't know how to plan EventTimeWatermark.
This PR just replaces the QueryExecution with IncrementalExecution to fix the issue.
## How was this patch tested?
`test("foreach with watermark")`.
Author: Shixiong Zhu <sh...@databricks.com>
Closes #15934 from zsxwing/SPARK-18497.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2a40de40
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2a40de40
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2a40de40
Branch: refs/heads/master
Commit: 2a40de408b5eb47edba92f9fe92a42ed1e78bf98
Parents: 6f7ff75
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Fri Nov 18 16:34:38 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Nov 18 16:34:38 2016 -0800
----------------------------------------------------------------------
.../sql/execution/streaming/ForeachSink.scala | 16 ++++-----
.../execution/streaming/ForeachSinkSuite.scala | 35 ++++++++++++++++++++
2 files changed, 43 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/2a40de40/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
index f5c550d..c93fcfb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ForeachSink.scala
@@ -47,22 +47,22 @@ class ForeachSink[T : Encoder](writer: ForeachWriter[T]) extends Sink with Seria
// method supporting incremental planning. But in the long run, we should generally make newly
// created Datasets use `IncrementalExecution` where necessary (which is SPARK-16264 tries to
// resolve).
-
+ val incrementalExecution = data.queryExecution.asInstanceOf[IncrementalExecution]
val datasetWithIncrementalExecution =
- new Dataset(data.sparkSession, data.logicalPlan, implicitly[Encoder[T]]) {
+ new Dataset(data.sparkSession, incrementalExecution, implicitly[Encoder[T]]) {
override lazy val rdd: RDD[T] = {
val objectType = exprEnc.deserializer.dataType
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
// was originally: sparkSession.sessionState.executePlan(deserialized) ...
- val incrementalExecution = new IncrementalExecution(
+ val newIncrementalExecution = new IncrementalExecution(
this.sparkSession,
deserialized,
- data.queryExecution.asInstanceOf[IncrementalExecution].outputMode,
- data.queryExecution.asInstanceOf[IncrementalExecution].checkpointLocation,
- data.queryExecution.asInstanceOf[IncrementalExecution].currentBatchId,
- data.queryExecution.asInstanceOf[IncrementalExecution].currentEventTimeWatermark)
- incrementalExecution.toRdd.mapPartitions { rows =>
+ incrementalExecution.outputMode,
+ incrementalExecution.checkpointLocation,
+ incrementalExecution.currentBatchId,
+ incrementalExecution.currentEventTimeWatermark)
+ newIncrementalExecution.toRdd.mapPartitions { rows =>
rows.map(_.get(0, objectType))
}.asInstanceOf[RDD[T]]
}
http://git-wip-us.apache.org/repos/asf/spark/blob/2a40de40/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
index 9e05921..ee626103 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala
@@ -25,6 +25,7 @@ import org.scalatest.BeforeAndAfter
import org.apache.spark.SparkException
import org.apache.spark.sql.ForeachWriter
+import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.streaming.{OutputMode, StreamingQueryException, StreamTest}
import org.apache.spark.sql.test.SharedSQLContext
@@ -169,6 +170,40 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf
assert(errorEvent.error.get.getMessage === "error")
}
}
+
+ test("foreach with watermark") {
+ val inputData = MemoryStream[Int]
+
+ val windowedAggregation = inputData.toDF()
+ .withColumn("eventTime", $"value".cast("timestamp"))
+ .withWatermark("eventTime", "10 seconds")
+ .groupBy(window($"eventTime", "5 seconds") as 'window)
+ .agg(count("*") as 'count)
+ .select($"count".as[Long])
+ .map(_.toInt)
+ .repartition(1)
+
+ val query = windowedAggregation
+ .writeStream
+ .outputMode(OutputMode.Complete)
+ .foreach(new TestForeachWriter())
+ .start()
+ try {
+ inputData.addData(10, 11, 12)
+ query.processAllAvailable()
+
+ val allEvents = ForeachSinkSuite.allEvents()
+ assert(allEvents.size === 1)
+ val expectedEvents = Seq(
+ ForeachSinkSuite.Open(partition = 0, version = 0),
+ ForeachSinkSuite.Process(value = 3),
+ ForeachSinkSuite.Close(None)
+ )
+ assert(allEvents === Seq(expectedEvents))
+ } finally {
+ query.stop()
+ }
+ }
}
/** A global object to collect events in the executor */
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org