You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/01/17 17:57:19 UTC

spark git commit: [SPARK-19065][SQL] Don't inherit expression id in dropDuplicates

Repository: spark
Updated Branches:
  refs/heads/master 20e628062 -> a83accfcf


[SPARK-19065][SQL] Don't inherit expression id in dropDuplicates

## What changes were proposed in this pull request?

`dropDuplicates` will create an Alias using the same exprId, so `StreamExecution` should also replace Alias if necessary.

## How was this patch tested?

test("SPARK-19065: dropDuplicates should not create expressions using the same id")

Author: Shixiong Zhu <sh...@databricks.com>

Closes #16564 from zsxwing/SPARK-19065.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a83accfc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a83accfc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a83accfc

Branch: refs/heads/master
Commit: a83accfcfd6a92afac5040c50577258ab83d10dd
Parents: 20e6280
Author: Shixiong Zhu <sh...@databricks.com>
Authored: Wed Jan 18 01:57:12 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Jan 18 01:57:12 2017 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/Dataset.scala    |  5 +---
 .../org/apache/spark/sql/DatasetSuite.scala     |  7 ------
 .../spark/sql/streaming/StreamSuite.scala       | 26 ++++++++++++++++++++
 3 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a83accfc/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 1a7a5ba..24b9b81 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -2003,10 +2003,7 @@ class Dataset[T] private[sql](
       if (groupColExprIds.contains(attr.exprId)) {
         attr
       } else {
-        // Removing duplicate rows should not change output attributes. We should keep
-        // the original exprId of the attribute. Otherwise, to select a column in original
-        // dataset will cause analysis exception due to unresolved attribute.
-        Alias(new First(attr).toAggregateExpression(), attr.name)(exprId = attr.exprId)
+        Alias(new First(attr).toAggregateExpression(), attr.name)()
       }
     }
     Aggregate(groupCols, aggCols, logicalPlan)

http://git-wip-us.apache.org/repos/asf/spark/blob/a83accfc/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
index 731a28c..b37bf13 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
@@ -898,13 +898,6 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
       (1, 2), (1, 1), (2, 1), (2, 2))
   }
 
-  test("dropDuplicates should not change child plan output") {
-    val ds = Seq(("a", 1), ("a", 2), ("b", 1), ("a", 1)).toDS()
-    checkDataset(
-      ds.dropDuplicates("_1").select(ds("_1").as[String], ds("_2").as[Int]),
-      ("a", 1), ("b", 1))
-  }
-
   test("SPARK-16097: Encoders.tuple should handle null object correctly") {
     val enc = Encoders.tuple(Encoders.tuple(Encoders.STRING, Encoders.STRING), Encoders.STRING)
     val data = Seq((("a", "b"), "c"), (null, "d"))

http://git-wip-us.apache.org/repos/asf/spark/blob/a83accfc/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e964e64..f31dc8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -304,6 +304,32 @@ class StreamSuite extends StreamTest {
       q.stop()
     }
   }
+
+  test("SPARK-19065: dropDuplicates should not create expressions using the same id") {
+    withTempPath { testPath =>
+      val data = Seq((1, 2), (2, 3), (3, 4))
+      data.toDS.write.mode("overwrite").json(testPath.getCanonicalPath)
+      val schema = spark.read.json(testPath.getCanonicalPath).schema
+      val query = spark
+        .readStream
+        .schema(schema)
+        .json(testPath.getCanonicalPath)
+        .dropDuplicates("_1")
+        .writeStream
+        .format("memory")
+        .queryName("testquery")
+        .outputMode("complete")
+        .start()
+      try {
+        query.processAllAvailable()
+        if (query.exception.isDefined) {
+          throw query.exception.get
+        }
+      } finally {
+        query.stop()
+      }
+    }
+  }
 }
 
 /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org