You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/08/07 19:27:18 UTC

spark git commit: [SPARK-21565][SS] Propagate metadata in attribute replacement.

Repository: spark
Updated Branches:
  refs/heads/master 4f7ec3a31 -> cce25b360


[SPARK-21565][SS] Propagate metadata in attribute replacement.

## What changes were proposed in this pull request?

Propagate metadata in attribute replacement during streaming execution. This is necessary for EventTimeWatermarks consuming replaced attributes.

## How was this patch tested?
new unit test, which was verified to fail before the fix

Author: Jose Torres <jo...@databricks.com>

Closes #18840 from joseph-torres/SPARK-21565.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cce25b36
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cce25b36
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cce25b36

Branch: refs/heads/master
Commit: cce25b360ee9e39d9510134c73a1761475eaf4ac
Parents: 4f7ec3a
Author: Jose Torres <jo...@databricks.com>
Authored: Mon Aug 7 12:27:16 2017 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Aug 7 12:27:16 2017 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamExecution.scala   |  3 ++-
 .../sql/streaming/EventTimeWatermarkSuite.scala | 28 ++++++++++++++++++++
 2 files changed, 30 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 5711262..1528e7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -628,7 +628,8 @@ class StreamExecution(
     // Rewire the plan to use the new attributes that were returned by the source.
     val replacementMap = AttributeMap(replacements)
     val triggerLogicalPlan = withNewSources transformAllExpressions {
-      case a: Attribute if replacementMap.contains(a) => replacementMap(a)
+      case a: Attribute if replacementMap.contains(a) =>
+        replacementMap(a).withMetadata(a.metadata)
       case ct: CurrentTimestamp =>
         CurrentBatchTimestamp(offsetSeqMetadata.batchTimestampMs,
           ct.dataType)

http://git-wip-us.apache.org/repos/asf/spark/blob/cce25b36/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 552911f..4f19fa0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -391,6 +391,34 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     checkDataset[Long](df, 1L to 100L: _*)
   }
 
+  test("SPARK-21565: watermark operator accepts attributes from replacement") {
+    withTempDir { dir =>
+      dir.delete()
+
+      val df = Seq(("a", 100.0, new java.sql.Timestamp(100L)))
+        .toDF("symbol", "price", "eventTime")
+      df.write.json(dir.getCanonicalPath)
+
+      val input = spark.readStream.schema(df.schema)
+        .json(dir.getCanonicalPath)
+
+      val groupEvents = input
+        .withWatermark("eventTime", "2 seconds")
+        .groupBy("symbol", "eventTime")
+        .agg(count("price") as 'count)
+        .select("symbol", "eventTime", "count")
+      val q = groupEvents.writeStream
+        .outputMode("append")
+        .format("console")
+        .start()
+      try {
+        q.processAllAvailable()
+      } finally {
+        q.stop()
+      }
+    }
+  }
+
   private def assertNumStateRows(numTotalRows: Long): AssertOnQuery = AssertOnQuery { q =>
     val progressWithData = q.recentProgress.filter(_.numInputRows > 0).lastOption.get
     assert(progressWithData.stateOperators(0).numRowsTotal === numTotalRows)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org