You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/10/13 05:28:06 UTC

spark git commit: [SPARK-17866][SPARK-17867][SQL] Fix Dataset.dropduplicates

Repository: spark
Updated Branches:
  refs/heads/master edeb51a39 -> 064d6650e


[SPARK-17866][SPARK-17867][SQL] Fix Dataset.dropduplicates

## What changes were proposed in this pull request?

Two issues regarding Dataset.dropduplicates:

1. Dataset.dropDuplicates should consider the columns with same column name

    We find and get the first resolved attribute from output with the given column name in `Dataset.dropDuplicates`. When we have the more than one columns with the same name. Other columns are put into aggregation columns, instead of grouping columns.

2. Dataset.dropDuplicates should not change the output of child plan

    We create new `Alias` with new exprId in `Dataset.dropDuplicates` now. However it causes problem when we want to select the columns as follows:

        val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
        // ds("_2") will cause analysis exception
        ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int])

Because the two issues are both related to `Dataset.dropduplicates` and the code changes are not big, so submitting them together as one PR.

## How was this patch tested?

Jenkins tests.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #15427 from viirya/fix-dropduplicates.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/064d6650
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/064d6650
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/064d6650

Branch: refs/heads/master
Commit: 064d6650e93ed6515a1309079c361e20404843cc
Parents: edeb51a
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Thu Oct 13 13:27:57 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Oct 13 13:27:57 2016 +0800

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/sql/Dataset.scala  | 16 ++++++++++++----
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 17 +++++++++++++++++
 2 files changed, 29 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/064d6650/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a7a8473..e59a483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1892,17 +1892,25 @@ class Dataset[T] private[sql](
   def dropDuplicates(colNames: Seq[String]): Dataset[T] = withTypedPlan {
     val resolver = sparkSession.sessionState.analyzer.resolver
     val allColumns = queryExecution.analyzed.output
-    val groupCols = colNames.map { colName =>
-      allColumns.find(col => resolver(col.name, colName)).getOrElse(
+    val groupCols = colNames.flatMap { colName =>
+      // It is possibly there are more than one columns with the same name,
+      // so we call filter instead of find.
+      val cols = allColumns.filter(col => resolver(col.name, colName))
+      if (cols.isEmpty) {
         throw new AnalysisException(
-          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})"""))
+          s"""Cannot resolve column name "$colName" among (${schema.fieldNames.mkString(", ")})""")
+      }
+      cols
     }
     val groupColExprIds = groupCols.map(_.exprId)
     val aggCols = logicalPlan.output.map { attr =>
       if (groupColExprIds.contains(attr.exprId)) {
         attr
       } else {
-        Alias(new First(attr).toAggregateExpression(), attr.name)()
+        // Removing duplicate rows should not change output attributes. We should keep
+        // the original exprId of the attribute. Otherwise, to select a column in original
+        // dataset will cause analysis exception due to unresolved attribute.
+        Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
       }
     }
     Aggregate(groupCols, aggCols, logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/064d6650/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 3243f35..5fce9b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -872,6 +872,23 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       ("a", 1), ("a", 2), ("b", 1))
   }
 
+  test("dropDuplicates: columns with same column name") {
+    val ds1 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+    val ds2 = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+    // The dataset joined has two columns of the same name "_2".
+    val joined = ds1.join(ds2, "_1").select(ds1("_2").as[Int], ds2("_2").as[Int])
+    checkDataset(
+      joined.dropDuplicates(),
+      (1, 2), (1, 1), (2, 1), (2, 2))
+  }
+
+  test("dropDuplicates should not change child plan output") {
+    val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
+    checkDataset(
+      ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
+      ("a", 1), ("b", 1))
+  }
+
   test("SPARK-16097: Encoders.tuple should handle null object correctly") {
     val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
     val data = Seq((("a", "b"), "c"), (null, "d"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org