You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/25 03:24:34 UTC

git commit: SPARK-1294 Fix resolution of uppercase field names using a HiveContext.

Repository: spark
Updated Branches:
  refs/heads/master 56db8a2f0 -> 8043b7bc7


SPARK-1294 Fix resolution of uppercase field names using a HiveContext.

Fixing this bug required the following:
 - Creation of a new logical node that converts a schema to lowercase.
 - Generalization of the subquery eliding rule to also elide this new node
 - Fixing of several places where too tight assumptions were made on the types of `InsertIntoTable` children.
 - I also removed an API that was left in by accident that exposed catalyst data structures, and fix the logic that pushes down filters into hive tables scans to correctly compare attribute references.

Author: Michael Armbrust <mi...@databricks.com>

Closes #202 from marmbrus/upperCaseFieldNames and squashes the following commits:

15e5265 [Michael Armbrust] Support for resolving mixed case fields from a reflected schema using HiveQL.
5aa5035 [Michael Armbrust] Remove API that exposes internal catalyst data structures.
9d99cb6 [Michael Armbrust] Attributes should be compared using exprId, not TreeNode.id.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8043b7bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8043b7bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8043b7bc

Branch: refs/heads/master
Commit: 8043b7bc74ff3640743ffc3f1be386dc42f3f44c
Parents: 56db8a2
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Mar 24 19:24:22 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Mar 24 19:24:22 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 19 ++++++++++++-
 .../sql/catalyst/optimizer/Optimizer.scala      | 14 ---------
 .../catalyst/plans/logical/basicOperators.scala | 30 +++++++++++++++++++-
 .../optimizer/ConstantFoldingSuite.scala        |  7 +++--
 .../optimizer/FilterPushdownSuite.scala         | 13 +++++----
 .../org/apache/spark/sql/hive/HiveContext.scala | 18 ++++++------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  6 ++--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  4 +--
 .../hive/execution/HiveResolutionSuite.scala    | 13 +++++++++
 9 files changed, 87 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index fc76e76..161d28e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -55,7 +55,9 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
       StarExpansion ::
       ResolveFunctions ::
       GlobalAggregates ::
-      typeCoercionRules :_*)
+      typeCoercionRules :_*),
+    Batch("AnalysisOperators", fixedPoint,
+      EliminateAnalysisOperators)
   )
 
   /**
@@ -80,6 +82,7 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
         case s: Star => s.copy(table = s.table.map(_.toLowerCase))
         case UnresolvedAttribute(name) => UnresolvedAttribute(name.toLowerCase)
         case Alias(c, name) => Alias(c, name.toLowerCase)()
+        case GetField(c, name) => GetField(c, name.toLowerCase)
       }
     }
   }
@@ -184,3 +187,17 @@ class Analyzer(catalog: Catalog, registry: FunctionRegistry, caseSensitive: Bool
       exprs.collect { case _: Star => true }.nonEmpty
   }
 }
+
+/**
+ * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan.  Subqueries are
+ * only required to provide scoping information for attributes and can be removed once analysis is
+ * complete.  Similarly, this node also removes
+ * [[catalyst.plans.logical.LowerCaseSchema LowerCaseSchema]] operators.
+ */
+object EliminateAnalysisOperators extends Rule[LogicalPlan] {
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case Subquery(_, child) => child
+    case LowerCaseSchema(child) => child
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index c120197..07ebbc9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -27,31 +27,17 @@ import org.apache.spark.sql.catalyst.types._
 
 object Optimizer extends RuleExecutor[LogicalPlan] {
   val batches =
-    Batch("Subqueries", Once,
-      EliminateSubqueries) ::
     Batch("ConstantFolding", Once,
       ConstantFolding,
       BooleanSimplification,
       SimplifyCasts) ::
     Batch("Filter Pushdown", Once,
-      EliminateSubqueries,
       CombineFilters,
       PushPredicateThroughProject,
       PushPredicateThroughInnerJoin) :: Nil
 }
 
 /**
- * Removes [[catalyst.plans.logical.Subquery Subquery]] operators from the plan.  Subqueries are
- * only required to provide scoping information for attributes and can be removed once analysis is
- * complete.
- */
-object EliminateSubqueries extends Rule[LogicalPlan] {
-  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case Subquery(_, child) => child
-  }
-}
-
-/**
  * Replaces [[catalyst.expressions.Expression Expressions]] that can be statically evaluated with
  * equivalent [[catalyst.expressions.Literal Literal]] values.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 6480cca..61481de 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -21,6 +21,7 @@ package plans
 package logical
 
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.types._
 
 case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode {
   def output = projectList.map(_.toAttribute)
@@ -86,7 +87,7 @@ case class Join(
 }
 
 case class InsertIntoTable(
-    table: BaseRelation,
+    table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
     overwrite: Boolean)
@@ -141,6 +142,33 @@ case class Subquery(alias: String, child: LogicalPlan) extends UnaryNode {
   def references = Set.empty
 }
 
+/**
+ * Converts the schema of `child` to all lowercase, together with LowercaseAttributeReferences
+ * this allows for optional case insensitive attribute resolution.  This node can be elided after
+ * analysis.
+ */
+case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
+  protected def lowerCaseSchema(dataType: DataType): DataType = dataType match {
+    case StructType(fields) =>
+      StructType(fields.map(f =>
+        StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
+    case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
+    case otherType => otherType
+  }
+
+  val output = child.output.map {
+    case a: AttributeReference =>
+      AttributeReference(
+        a.name.toLowerCase,
+        lowerCaseSchema(a.dataType),
+        a.nullable)(
+        a.exprId,
+        a.qualifiers)
+  }
+
+  def references = Set.empty
+}
+
 case class Sample(fraction: Double, withReplacement: Boolean, seed: Int, child: LogicalPlan)
     extends UnaryNode {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
index 2c107b8..53f760f 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ConstantFoldingSuite.scala
@@ -19,21 +19,22 @@ package org.apache.spark.sql
 package catalyst
 package optimizer
 
-import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.types.IntegerType
 
 // For implicit conversions
+import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.dsl.expressions._
 
 class ConstantFoldingSuite extends OptimizerTest {
 
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
-      Batch("Subqueries", Once,
-        EliminateSubqueries) ::
+      Batch("AnalysisNodes", Once,
+        EliminateAnalysisOperators) ::
       Batch("ConstantFolding", Once,
         ConstantFolding,
         BooleanSimplification) :: Nil

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 0aa7e4d..ae1b2b1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 package catalyst
 package optimizer
 
+
+import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 
@@ -31,9 +33,8 @@ class FilterPushdownSuite extends OptimizerTest {
   object Optimize extends RuleExecutor[LogicalPlan] {
     val batches =
       Batch("Subqueries", Once,
-        EliminateSubqueries) ::
+        EliminateAnalysisOperators) ::
       Batch("Filter Pushdown", Once,
-        EliminateSubqueries,
         CombineFilters,
         PushPredicateThroughProject,
         PushPredicateThroughInnerJoin) :: Nil
@@ -172,7 +173,7 @@ class FilterPushdownSuite extends OptimizerTest {
     }
     val optimized = Optimize(originalQuery.analyze)
 
-    comparePlans(optimizer.EliminateSubqueries(originalQuery.analyze), optimized)
+    comparePlans(analysis.EliminateAnalysisOperators(originalQuery.analyze), optimized)
   }
 
   test("joins: conjunctive predicates") {
@@ -191,7 +192,7 @@ class FilterPushdownSuite extends OptimizerTest {
       left.join(right, condition = Some("x.b".attr === "y.b".attr))
         .analyze
 
-    comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
   }
 
   test("joins: conjunctive predicates #2") {
@@ -210,7 +211,7 @@ class FilterPushdownSuite extends OptimizerTest {
       left.join(right, condition = Some("x.b".attr === "y.b".attr))
         .analyze
 
-    comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
   }
 
   test("joins: conjunctive predicates #3") {
@@ -233,6 +234,6 @@ class FilterPushdownSuite extends OptimizerTest {
           condition = Some("z.a".attr === "x.b".attr))
         .analyze
 
-    comparePlans(optimized, optimizer.EliminateSubqueries(correctAnswer))
+    comparePlans(optimized, analysis.EliminateAnalysisOperators(correctAnswer))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 491b3a6..af35c91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -32,7 +32,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
-import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{BaseRelation, LogicalPlan, LowerCaseSchema}
 import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.execution._
@@ -108,18 +108,20 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
 
   /* A catalyst metadata catalog that points to the Hive Metastore. */
   @transient
-  override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog
+  override lazy val catalog = new HiveMetastoreCatalog(this) with OverrideCatalog {
+    override def lookupRelation(
+      databaseName: Option[String],
+      tableName: String,
+      alias: Option[String] = None): LogicalPlan = {
+
+      LowerCaseSchema(super.lookupRelation(databaseName, tableName, alias))
+    }
+  }
 
   /* An analyzer that uses the Hive metastore. */
   @transient
   override lazy val analyzer = new Analyzer(catalog, HiveFunctionRegistry, caseSensitive = false)
 
-  def tables: Seq[BaseRelation] = {
-    // TODO: Move this functionallity to Catalog. Make client protected.
-    val allTables = catalog.client.getAllTables("default")
-    allTables.map(catalog.lookupRelation(None, _, None)).collect { case b: BaseRelation => b }
-  }
-
   /**
    * Runs the specified SQL query using Hive.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index a5db283..1667a21 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -27,7 +27,8 @@ import org.apache.hadoop.hive.ql.plan.TableDesc
 import org.apache.hadoop.hive.ql.session.SessionState
 import org.apache.hadoop.hive.serde2.Deserializer
 
-import org.apache.spark.sql.catalyst.analysis.Catalog
+
+import org.apache.spark.sql.catalyst.analysis.{Catalog, EliminateAnalysisOperators}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -96,7 +97,8 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
         createTable(databaseName, tableName, child.output)
 
         InsertIntoTable(
-          lookupRelation(Some(databaseName), tableName, None).asInstanceOf[BaseRelation],
+          EliminateAnalysisOperators(
+            lookupRelation(Some(databaseName), tableName, None)),
           Map.empty,
           child,
           overwrite = false)

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index c71141c..3dd0530 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -73,11 +73,11 @@ trait HiveStrategies {
       case p @ FilteredOperation(predicates, relation: MetastoreRelation)
         if relation.isPartitioned =>
 
-        val partitionKeyIds = relation.partitionKeys.map(_.id).toSet
+        val partitionKeyIds = relation.partitionKeys.map(_.exprId).toSet
 
         // Filter out all predicates that only deal with partition keys
         val (pruningPredicates, otherPredicates) = predicates.partition {
-          _.references.map(_.id).subsetOf(partitionKeyIds)
+          _.references.map(_.exprId).subsetOf(partitionKeyIds)
         }
 
         val scan = HiveTableScan(

http://git-wip-us.apache.org/repos/asf/spark/blob/8043b7bc/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
index 996bd4e..4bdea21 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveResolutionSuite.scala
@@ -19,6 +19,11 @@ package org.apache.spark.sql
 package hive
 package execution
 
+import TestHive._
+
+case class Data(a: Int, B: Int, n: Nested)
+case class Nested(a: Int, B: Int)
+
 /**
  * A set of test cases expressed in Hive QL that are not covered by the tests included in the hive distribution.
  */
@@ -47,6 +52,14 @@ class HiveResolutionSuite extends HiveComparisonTest {
   createQueryTest("alias.*",
     "SELECT a.* FROM src a ORDER BY key LIMIT 1")
 
+  test("case insensitivity with scala reflection") {
+    // Test resolution with Scala Reflection
+    TestHive.sparkContext.parallelize(Data(1, 2, Nested(1,2)) :: Nil)
+      .registerAsTable("caseSensitivityTest")
+
+    sql("SELECT a, b, A, B, n.a, n.b, n.A, n.B FROM caseSensitivityTest")
+  }
+
   /**
    * Negative examples.  Currently only left here for documentation purposes.
    * TODO(marmbrus): Test that catalyst fails on these queries.