You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/15 14:52:33 UTC

[spark] branch branch-3.0 updated: [SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 764da2f  [SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
764da2f is described below

commit 764da2fbc24b63fcb17d231487880c0d58364144
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Mon Jun 15 07:48:48 2020 -0700

    [SPARK-31990][SS] Use toSet.toSeq in Dataset.dropDuplicates
    
    ### What changes were proposed in this pull request?
    
    This PR partially revert SPARK-31292 in order to provide a hot-fix for a bug in `Dataset.dropDuplicates`; we must preserve the input order of `colNames` for `groupCols` because the Streaming's state store depends on the `groupCols` order.
    
    ### Why are the changes needed?
    
    Bug fix.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Added tests in `DataFrameSuite`.
    
    Closes #28830 from maropu/SPARK-31990.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
    (cherry picked from commit 7f7b4dd5199e7c185aedf51fccc400c7072bed05)
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 9d4312b..2ddedf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2541,7 +2541,9 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
     val resolver = sparkSession.sessionState.analyzer.resolver
     val allColumns = queryExecution.analyzed.output
-    val groupCols = colNames.distinct.flatMap { (colName: String) =>
+    // SPARK-31990: We must keep `toSet.toSeq` here because of the backward compatibility issue
+    // (the Streaming's state store depends on the `groupCols` order).
+    val groupCols = colNames.toSet.toSeq.flatMap { (colName: String) =>
       // It is possibly there are more than one columns with the same name,
       // so we call filter instead of find.
       val cols = allColumns.filter(col => resolver(col.name, colName))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org