You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/03/13 04:47:44 UTC
spark git commit: [SPARK-23523][SQL][BACKPORT-2.3] Fix the incorrect
result caused by the rule OptimizeMetadataOnlyQuery
Repository: spark
Updated Branches:
refs/heads/branch-2.3 a8e357ada -> 33ba8db8d
[SPARK-23523][SQL][BACKPORT-2.3] Fix the incorrect result caused by the rule OptimizeMetadataOnlyQuery
This PR is to backport https://github.com/apache/spark/pull/20684 and https://github.com/apache/spark/pull/20693 to Spark 2.3 branch
---
## What changes were proposed in this pull request?
```Scala
val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
.write.json(tablePath.getCanonicalPath)
val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
df.show()
```
It generates a wrong result.
```
[c,e,a]
```
We have a bug in the rule `OptimizeMetadataOnlyQuery `. We should respect the attribute order in the original leaf node. This PR is to fix it.
## How was this patch tested?
Added a test case
Author: Xingbo Jiang <xi...@databricks.com>
Author: gatorsmile <ga...@gmail.com>
Closes #20763 from gatorsmile/backport23523.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/33ba8db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/33ba8db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/33ba8db8
Branch: refs/heads/branch-2.3
Commit: 33ba8db8d8c8bf388606a6f5e34b082469038205
Parents: a8e357a
Author: Xingbo Jiang <xi...@databricks.com>
Authored: Mon Mar 12 21:47:38 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Mar 12 21:47:38 2018 -0700
----------------------------------------------------------------------
.../catalyst/plans/logical/LocalRelation.scala | 9 ++++----
.../execution/OptimizeMetadataOnlyQuery.scala | 12 +++++++++--
.../datasources/HadoopFsRelation.scala | 3 +++
.../OptimizeMetadataOnlyQuerySuite.scala | 22 ++++++++++++++++++++
4 files changed, 40 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
index d73d7e7..b05508d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LocalRelation.scala
@@ -43,10 +43,11 @@ object LocalRelation {
}
}
-case class LocalRelation(output: Seq[Attribute],
- data: Seq[InternalRow] = Nil,
- // Indicates whether this relation has data from a streaming source.
- override val isStreaming: Boolean = false)
+case class LocalRelation(
+ output: Seq[Attribute],
+ data: Seq[InternalRow] = Nil,
+ // Indicates whether this relation has data from a streaming source.
+ override val isStreaming: Boolean = false)
extends LeafNode with analysis.MultiInstanceRelation {
// A local relation must have resolved output.
http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
index 18f6f69..dc4aff9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuery.scala
@@ -17,6 +17,9 @@
package org.apache.spark.sql.execution
+import java.util.Locale
+
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{HiveTableRelation, SessionCatalog}
import org.apache.spark.sql.catalyst.expressions._
@@ -80,8 +83,13 @@ case class OptimizeMetadataOnlyQuery(catalog: SessionCatalog) extends Rule[Logic
private def getPartitionAttrs(
partitionColumnNames: Seq[String],
relation: LogicalPlan): Seq[Attribute] = {
- val partColumns = partitionColumnNames.map(_.toLowerCase).toSet
- relation.output.filter(a => partColumns.contains(a.name.toLowerCase))
+ val attrMap = relation.output.map(a => a.name.toLowerCase(Locale.ROOT) -> a).toMap
+ partitionColumnNames.map { colName =>
+ attrMap.getOrElse(colName.toLowerCase(Locale.ROOT),
+ throw new AnalysisException(s"Unable to find the column `$colName` " +
+ s"given [${relation.output.map(_.name).mkString(", ")}]")
+ )
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
index 6b34638..b2f73b7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/HadoopFsRelation.scala
@@ -67,6 +67,9 @@ case class HadoopFsRelation(
}
}
+ // When data and partition schemas have overlapping columns, the output
+ // schema respects the order of the data schema for the overlapping columns, and it
+ // respects the data types of the partition schema.
val schema: StructType = {
StructType(dataSchema.map(f => overlappedPartCols.getOrElse(getColName(f), f)) ++
partitionSchema.filterNot(f => overlappedPartCols.contains(getColName(f))))
http://git-wip-us.apache.org/repos/asf/spark/blob/33ba8db8/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
index 78c1e5d..a543eb8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/OptimizeMetadataOnlyQuerySuite.scala
@@ -17,9 +17,12 @@
package org.apache.spark.sql.execution
+import java.io.File
+
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.OPTIMIZER_METADATA_ONLY
import org.apache.spark.sql.test.SharedSQLContext
class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
@@ -125,4 +128,23 @@ class OptimizeMetadataOnlyQuerySuite extends QueryTest with SharedSQLContext {
sql("SELECT COUNT(DISTINCT p) FROM t_1000").collect()
}
}
+
+ test("Incorrect result caused by the rule OptimizeMetadataOnlyQuery") {
+ withSQLConf(OPTIMIZER_METADATA_ONLY.key -> "true") {
+ withTempPath { path =>
+ val tablePath = new File(s"${path.getCanonicalPath}/cOl3=c/cOl1=a/cOl5=e")
+ Seq(("a", "b", "c", "d", "e")).toDF("cOl1", "cOl2", "cOl3", "cOl4", "cOl5")
+ .write.json(tablePath.getCanonicalPath)
+
+ val df = spark.read.json(path.getCanonicalPath).select("CoL1", "CoL5", "CoL3").distinct()
+ checkAnswer(df, Row("a", "e", "c"))
+
+ val localRelation = df.queryExecution.optimizedPlan.collectFirst {
+ case l: LocalRelation => l
+ }
+ assert(localRelation.nonEmpty, "expect to see a LocalRelation")
+ assert(localRelation.get.output.map(_.name) == Seq("cOl3", "cOl1", "cOl5"))
+ }
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org