You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/30 06:25:55 UTC

[spark] branch branch-3.0 updated: [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new e9ca660  [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests
e9ca660 is described below

commit e9ca66096dc415e5046dba4f3f1738b7a96e39be
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Thu Apr 30 06:24:00 2020 +0000

    [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests
    
    ### What changes were proposed in this pull request?
    
    - Rephrase the API doc for `Column.as`
    - Simplify the UTs
    
    ### Why are the changes needed?
    Address comments in https://github.com/apache/spark/pull/28326
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UT added.
    
    Closes #28390 from xuanyuanking/SPARK-27340-follow.
    
    Authored-by: Yuanjian Li <xy...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 7195a18bf24d9506d2f8d9d4d93ff679b3d21b65)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 docs/sql-migration-guide.md                        |  2 ++
 .../main/scala/org/apache/spark/sql/Column.scala   | 12 ++++++++++
 .../sql/streaming/EventTimeWatermarkSuite.scala    | 16 +++++++++++++
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 26 ----------------------
 4 files changed, 30 insertions(+), 26 deletions(-)

diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index e68198a..774fb2c 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -30,6 +30,8 @@ license: |
 
   - In Spark 2.4 and below, `Dataset.groupByKey` results to a grouped dataset with key attribute is wrongly named as "value", if the key is non-struct type, for example, int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries unexpected. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behavior is preserved under a newly added configuration `spark.sql.legacy [...]
 
+  - In Spark 3.0, the column metadata will always be propagated in the API `Column.name` and `Column.as`. In Spark version 2.4 and earlier, the metadata of `NamedExpression` is set as the `explicitMetadata` for the new column at the time the API is called, it won't change even if the underlying `NamedExpression` changes metadata. To restore the behavior before Spark 2.4, you can use the API `as(alias: String, metadata: Metadata)` with explicit metadata.
+
 ### DDL Statements
 
   - In Spark 3.0, `CREATE TABLE` without a specific provider uses the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and below, it was Hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 6913d4e..2144472 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -964,6 +964,10 @@ class Column(val expr: Expression) extends Logging {
    *   df.select($"colA".as("colB"))
    * }}}
    *
+   * If the current column has metadata associated with it, this metadata will be propagated
+   * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+   * with explicit metadata.
+   *
    * @group expr_ops
    * @since 1.3.0
    */
@@ -1000,6 +1004,10 @@ class Column(val expr: Expression) extends Logging {
    *   df.select($"colA".as('colB))
    * }}}
    *
+   * If the current column has metadata associated with it, this metadata will be propagated
+   * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+   * with explicit metadata.
+   *
    * @group expr_ops
    * @since 1.3.0
    */
@@ -1026,6 +1034,10 @@ class Column(val expr: Expression) extends Logging {
    *   df.select($"colA".name("colB"))
    * }}}
    *
+   * If the current column has metadata associated with it, this metadata will be propagated
+   * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+   * with explicit metadata.
+   *
    * @group expr_ops
    * @since 2.0.0
    */
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 2313177..6486e1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -602,6 +602,22 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
     // Check the eventTime metadata is kept in the top level alias.
     assert(aliasWindow.logicalPlan.output.exists(
       _.metadata.contains(EventTimeWatermark.delayKey)))
+
+    val windowedAggregation = aliasWindow
+      .groupBy('aliasWindow)
+      .agg(count("*") as 'count)
+      .select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long])
+
+    testStream(windowedAggregation)(
+      AddData(inputData, 10, 11, 12, 13, 14, 15),
+      CheckNewAnswer(),
+      AddData(inputData, 25), // Advance watermark to 15 seconds
+      CheckNewAnswer((10, 5)),
+      assertNumStateRows(2),
+      AddData(inputData, 10), // Should not emit anything as data less than watermark
+      CheckNewAnswer(),
+      assertNumStateRows(2)
+    )
   }
 
   test("test no-data flag") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 33f899b..3f218c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -991,30 +991,4 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       )
     }
   }
-
-  test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
-    val (leftInput, df1) = setupStream("left", 2)
-    val (rightInput, df2) = setupStream("right", 3)
-    val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)
-    val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue)
-    val joined = left.join(
-      right,
-      left("key") === right("key") && left("leftWindow") === right("rightWindow"),
-      "left_outer")
-      .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue)
-
-    testStream(joined)(
-      // Test inner part of the join.
-      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
-      CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
-
-      MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
-      CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
-      assertNumStateRows(total = 2, updated = 12),
-
-      AddData(leftInput, 22),
-      CheckNewAnswer(Row(22, 30, 44, 66)),
-      assertNumStateRows(total = 3, updated = 1)
-    )
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org