You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/04/30 06:25:55 UTC
[spark] branch branch-3.0 updated:
[SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify
tests
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new e9ca660 [SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests
e9ca660 is described below
commit e9ca66096dc415e5046dba4f3f1738b7a96e39be
Author: Yuanjian Li <xy...@gmail.com>
AuthorDate: Thu Apr 30 06:24:00 2020 +0000
[SPARK-27340][SS][TESTS][FOLLOW-UP] Rephrase API comments and simplify tests
### What changes were proposed in this pull request?
- Rephrase the API doc for `Column.as`
- Simplify the UTs
### Why are the changes needed?
Address comments in https://github.com/apache/spark/pull/28326
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
New UT added.
Closes #28390 from xuanyuanking/SPARK-27340-follow.
Authored-by: Yuanjian Li <xy...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 7195a18bf24d9506d2f8d9d4d93ff679b3d21b65)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
docs/sql-migration-guide.md | 2 ++
.../main/scala/org/apache/spark/sql/Column.scala | 12 ++++++++++
.../sql/streaming/EventTimeWatermarkSuite.scala | 16 +++++++++++++
.../spark/sql/streaming/StreamingJoinSuite.scala | 26 ----------------------
4 files changed, 30 insertions(+), 26 deletions(-)
diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md
index e68198a..774fb2c 100644
--- a/docs/sql-migration-guide.md
+++ b/docs/sql-migration-guide.md
@@ -30,6 +30,8 @@ license: |
- In Spark 2.4 and below, `Dataset.groupByKey` results to a grouped dataset with key attribute is wrongly named as "value", if the key is non-struct type, for example, int, string, array, etc. This is counterintuitive and makes the schema of aggregation queries unexpected. For example, the schema of `ds.groupByKey(...).count()` is `(value, count)`. Since Spark 3.0, we name the grouping attribute to "key". The old behavior is preserved under a newly added configuration `spark.sql.legacy [...]
+ - In Spark 3.0, the column metadata will always be propagated in the API `Column.name` and `Column.as`. In Spark version 2.4 and earlier, the metadata of `NamedExpression` is set as the `explicitMetadata` for the new column at the time the API is called, it won't change even if the underlying `NamedExpression` changes metadata. To restore the behavior before Spark 2.4, you can use the API `as(alias: String, metadata: Metadata)` with explicit metadata.
+
### DDL Statements
- In Spark 3.0, `CREATE TABLE` without a specific provider uses the value of `spark.sql.sources.default` as its provider. In Spark version 2.4 and below, it was Hive. To restore the behavior before Spark 3.0, you can set `spark.sql.legacy.createHiveTableByDefault.enabled` to `true`.
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 6913d4e..2144472 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -964,6 +964,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as("colB"))
* }}}
*
+ * If the current column has metadata associated with it, this metadata will be propagated
+ * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+ * with explicit metadata.
+ *
* @group expr_ops
* @since 1.3.0
*/
@@ -1000,6 +1004,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".as('colB))
* }}}
*
+ * If the current column has metadata associated with it, this metadata will be propagated
+ * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+ * with explicit metadata.
+ *
* @group expr_ops
* @since 1.3.0
*/
@@ -1026,6 +1034,10 @@ class Column(val expr: Expression) extends Logging {
* df.select($"colA".name("colB"))
* }}}
*
+ * If the current column has metadata associated with it, this metadata will be propagated
+ * to the new column. If this not desired, use the API `as(alias: String, metadata: Metadata)`
+ * with explicit metadata.
+ *
* @group expr_ops
* @since 2.0.0
*/
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 2313177..6486e1a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -602,6 +602,22 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// Check the eventTime metadata is kept in the top level alias.
assert(aliasWindow.logicalPlan.output.exists(
_.metadata.contains(EventTimeWatermark.delayKey)))
+
+ val windowedAggregation = aliasWindow
+ .groupBy('aliasWindow)
+ .agg(count("*") as 'count)
+ .select($"aliasWindow".getField("start").cast("long").as[Long], $"count".as[Long])
+
+ testStream(windowedAggregation)(
+ AddData(inputData, 10, 11, 12, 13, 14, 15),
+ CheckNewAnswer(),
+ AddData(inputData, 25), // Advance watermark to 15 seconds
+ CheckNewAnswer((10, 5)),
+ assertNumStateRows(2),
+ AddData(inputData, 10), // Should not emit anything as data less than watermark
+ CheckNewAnswer(),
+ assertNumStateRows(2)
+ )
}
test("test no-data flag") {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 33f899b..3f218c9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -991,30 +991,4 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
)
}
}
-
- test("SPARK-27340 Windowed left out join with Alias on TimeWindow") {
- val (leftInput, df1) = setupStream("left", 2)
- val (rightInput, df2) = setupStream("right", 3)
- val left = df1.select('key, window('leftTime, "10 second") as 'leftWindow, 'leftValue)
- val right = df2.select('key, window('rightTime, "10 second") as 'rightWindow, 'rightValue)
- val joined = left.join(
- right,
- left("key") === right("key") && left("leftWindow") === right("rightWindow"),
- "left_outer")
- .select(left("key"), $"leftWindow.end".cast("long"), 'leftValue, 'rightValue)
-
- testStream(joined)(
- // Test inner part of the join.
- MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
- CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
-
- MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
- CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
- assertNumStateRows(total = 2, updated = 12),
-
- AddData(leftInput, 22),
- CheckNewAnswer(Row(22, 30, 44, 66)),
- assertNumStateRows(total = 3, updated = 1)
- )
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org