You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/27 16:44:30 UTC

spark git commit: [SPARK-23523][SQL] Fix the incorrect result caused by the rule OptimizeMetadataOnlyQuery

Repository: spark
Updated Branches:
  refs/heads/master eac0b0672 -> 414ee867b


[SPARK-23523][SQL] Fix the incorrect result caused by the rule OptimizeMetadataOnlyQuery

## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
 Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
 .write.json(tablePath.getCanonicalPath)
 val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
 df.show()
```

It generates a wrong result.
```
[c,e,a]
```

We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the attribute order in the original leaf node. This PR is to fix it.

## How was this patch tested?
Added a test case

Author: gatorsmile <ga...@gmail.com>

Closes #20684 from gatorsmile/optimizeMetadataOnly.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/414ee867
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/414ee867
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/414ee867

Branch: refs/heads/master
Commit: 414ee867ba0835b97aae2e8d4e489e1879c251dd
Parents: eac0b06
Author: gatorsmile <ga...@gmail.com>
Authored: Tue Feb 27 08:44:25 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Feb 27 08:44:25 2018 -0800

----------------------------------------------------------------------
 .../catalyst/plans/logical/LocalRelation.scala  |  9 ++++----
 .../execution/OptimizeMetadataOnlyQuery.scala   | 12 +++++++++--
 .../datasources/HadoopFsRelation.scala          |  3 +++
 .../OptimizeMetadataOnlyQuerySuite.scala        | 22 ++++++++++++++++++++
 4 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/414ee867/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index d73d7e7..b05508d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -43,10 +43,11 @@ object LocalRelation {
   }
 }
 
-case class LocalRelation(output: Seq[Attribute],
-                         data: Seq[InternalRow] = Nil,
-                         // Indicates whether this relation has data from a streaming source.
-                         override val isStreaming: Boolean = false)
+case class LocalRelation(
+    output: Seq[Attribute],
+    data: Seq[InternalRow] = Nil,
+    // Indicates whether this relation has data from a streaming source.
+    override val isStreaming: Boolean = false)
   extends LeafNode with analysis.MultiInstanceRelation {
 
   // A local relation must have resolved output.

http://git-wip-us.apache.org/repos/asf/spark/blob/414ee867/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 18f6f69..0613d90 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.execution
 
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions._
@@ -80,8 +83,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
   private def getPartitionAttrs(
       partitionColumnNames: Seq[String],
       relation: LogicalPlan): Seq[Attribute] = {
-    val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
-    relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+    val attrMap = relation.output.map(_.name.toLowerCase(Locale.ROOT)).zip(relation.output).toMap
+    partitionColumnNames.map { colName =>
+      attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
+        throw new AnalysisException(s"Unable to find the column `$colName` " +
+          s"given [${relation.output.map(_.name).mkString(", ")}]")
+      )
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/414ee867/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 6b34638..ac574b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -67,6 +67,9 @@ case class HadoopFsRelation(
     }
   }
 
+  // When data schema and partition schema have the overlapped columns, the output
+  // schema respects the order of data schema for the overlapped columns, but respect
+  // the data types of partition schema
   val schema: StructType = {
     StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
       partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))

http://git-wip-us.apache.org/repos/asf/spark/blob/414ee867/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index 78c1e5d..a543eb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -17,9 +17,12 @@
 
 package org.apache.spark.sql.execution
 
+import java.io.File
+
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
 import org.apache.spark.sql.test.SharedSQLContext
 
 class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
@@ -125,4 +128,23 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
       sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
     }
   }
+
+  test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
+    withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+      withTempPath { path =>
+        val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+        Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
+          .write.json(tablePath.getCanonicalPath)
+
+        val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
+        checkAnswer(df, Row("a", "e", "c"))
+
+        val localRelation = df.queryExecution.optimizedPlan.collectFirst {
+          case l: LocalRelation => l
+        }
+        assert(localRelation.nonEmpty, "expect to see a LocalRelation")
+        assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org