You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/10/13 05:09:42 UTC

spark git commit: [SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids between logical and physical plan

Repository: spark
Updated Branches:
  refs/heads/master 3ff766f61 -> ec122209f


[SPARK-21165][SQL] FileFormatWriter should handle mismatched attribute ids between logical and physical plan

## What changes were proposed in this pull request?

Due to optimizer removing some unnecessary aliases, the logical and physical plan may have different output attribute ids. FileFormatWriter should handle this when creating the physical sort node.

## How was this patch tested?

new regression test.

Author: Wenchen Fan <we...@databricks.com>

Closes #19483 from cloud-fan/bug2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec122209
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec122209
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec122209

Branch: refs/heads/master
Commit: ec122209fb35a65637df42eded64b0203e105aae
Parents: 3ff766f
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Oct 13 13:09:35 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Oct 13 13:09:35 2017 +0800

----------------------------------------------------------------------
 .../datasources/FileFormatWriter.scala          |  7 ++++++-
 .../datasources/FileFormatWriterSuite.scala     |  2 +-
 .../org/apache/spark/sql/hive/InsertSuite.scala | 22 ++++++++++++++++++++
 3 files changed, 29 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
index 75b1695..1fac01a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala
@@ -180,8 +180,13 @@ object FileFormatWriter extends Logging {
       val rdd = if (orderingMatched) {
         queryExecution.toRdd
       } else {
+        // SPARK-21165: the `requiredOrdering` is based on the attributes from analyzed plan, and
+        // the physical plan may have different attribute ids due to optimizer removing some
+        // aliases. Here we bind the expression ahead to avoid potential attribute ids mismatch.
+        val orderingExpr = requiredOrdering
+          .map(SortOrder(_, Ascending)).map(BindReferences.bindReference(_, allColumns))
         SortExec(
-          requiredOrdering.map(SortOrder(_, Ascending)),
+          orderingExpr,
           global = false,
           child = queryExecution.executedPlan).execute()
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
index 6f8767d..13f0e0b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileFormatWriterSuite.scala
@@ -32,7 +32,7 @@ class FileFormatWriterSuite extends QueryTest with SharedSQLContext {
     }
   }
 
-  test("FileFormatWriter should respect the input query schema") {
+  test("SPARK-22252: FileFormatWriter should respect the input query schema") {
     withTable("t1", "t2", "t3", "t4") {
       spark.range(1).select('id as 'col1, 'id as 'col2).write.saveAsTable("t1")
       spark.sql("select COL1, COL2 from t1").write.saveAsTable("t2")

http://git-wip-us.apache.org/repos/asf/spark/blob/ec122209/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
index aa5cae3..ab91727 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertSuite.scala
@@ -728,4 +728,26 @@ class InsertSuite extends QueryTest with TestHiveSingleton with BeforeAndAfter
       assert(e.contains("mismatched input 'ROW'"))
     }
   }
+
+  test("SPARK-21165: FileFormatWriter should only rely on attributes from analyzed plan") {
+    withSQLConf(("hive.exec.dynamic.partition.mode", "nonstrict")) {
+      withTable("tab1", "tab2") {
+        Seq(("a", "b", 3)).toDF("word", "first", "length").write.saveAsTable("tab1")
+
+        spark.sql(
+          """
+            |CREATE TABLE tab2 (word string, length int)
+            |PARTITIONED BY (first string)
+          """.stripMargin)
+
+        spark.sql(
+          """
+            |INSERT INTO TABLE tab2 PARTITION(first)
+            |SELECT word, length, cast(first as string) as first FROM tab1
+          """.stripMargin)
+
+        checkAnswer(spark.table("tab2"), Row("a", 3, "b"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org