You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/08/07 19:27:18 UTC
spark git commit: [SPARK-21565][SS] Propagate metadata in attribute
replacement.
Repository: spark
Updated Branches:
refs/heads/master 4f7ec3a31 -> cce25b360
[SPARK-21565][SS] Propagate metadata in attribute replacement.
## What changes were proposed in this pull request?
Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes.
## How was this patch tested?
new unit test, which was verified to fail before the fix
Author: Jose Torres <jo...@databricks.com>
Closes #18840 from joseph-torres/SPARK-21565.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cce25b36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cce25b36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cce25b36
Branch: refs/heads/master
Commit: cce25b360ee9e39d9510134c73a1761475eaf4ac
Parents: 4f7ec3a
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Aug 7 12:27:16 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Aug 7 12:27:16 2017 -0700
----------------------------------------------------------------------
.../execution/streaming/StreamExecution.scala | 3 ++-
.../sql/streaming/EventTimeWatermarkSuite.scala | 28 ++++++++++++++++++++
2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5711262..1528e7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -628,7 +628,8 @@ class StreamExecution(
// Rewire the plan to use the new attributes that were returned by the source.
val replacementMap = AttributeMap(replacements)
val triggerLogicalPlan = withNewSources transformAllExpressions {
- case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+ case a: Attribute if replacementMap.contains(a) =>
+ replacementMap(a).withMetadata(a.metadata)
case ct: CurrentTimestamp =>
CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
ct.dataType)
http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 552911f..4f19fa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
checkDataset[Long](df, 1L to 100L: _*)
}
+ test("SPARK-21565: watermark operator accepts attributes from replacement") {
+ withTempDir { dir =>
+ dir.delete()
+
+ val df = Seq(("a", 100.0, new java.sql.Timestamp(100L)))
+ .toDF("symbol", "price", "eventTime")
+ df.write.json(dir.getCanonicalPath)
+
+ val input = spark.readStream.schema(df.schema)
+ .json(dir.getCanonicalPath)
+
+ val groupEvents = input
+ .withWatermark("eventTime", "2 seconds")
+ .groupBy("symbol", "eventTime")
+ .agg(count("price") as 'count)
+ .select("symbol", "eventTime", "count")
+ val q = groupEvents.writeStream
+ .outputMode("append")
+ .format("console")
+ .start()
+ try {
+ q.processAllAvailable()
+ } finally {
+ q.stop()
+ }
+ }
+ }
+
private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org