You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/15 14:52:33 UTC
[spark] branch branch-3.0 updated: [SPARK-31990][SS] Use
toSet.toSeq in Dataset.dropDuplicates
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 764da2f [SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
764da2f is described below
commit 764da2fbc24b63fcb17d231487880c0d58364144
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Mon Jun 15 07:48:48 2020 -0700
[SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
### What changes were proposed in this pull request?
This PR partially revert SPARK-31292 in order to provide a hot-fix for a bug in `Dataset.dropDuplicates`; we must preserve the input order of `colNames` for `groupCols` because the Streaming's state store depends on the `groupCols` order.
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added tests in `DataFrameSuite`.
Closes #28830 from maropu/SPARK-31990.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
(cherry picked from commit 7f7b4dd5199e7c185aedf51fccc400c7072bed05)
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9d4312b..2ddedf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2541,7 +2541,9 @@ class Dataset[T] private[sql](
def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
val resolver = sparkSession.sessionState.analyzer.resolver
val allColumns = queryExecution.analyzed.output
- val groupCols = colNames.distinct.flatMap { (colName: String) =>
+ // SPARK-31990: We must keep `toSet.toSeq` here because of the backward compatibility issue
+ // (the Streaming's state store depends on the `groupCols` order).
+ val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
// It is possibly there are more than one columns with the same name,
// so we call filter instead of find.
val cols = allColumns.filter(col => resolver(col.name, colName))
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org