You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/03/16 19:57:35 UTC

spark git commit: [SPARK-13827][SQL] Can't add subquery to an operator with same-name outputs while generate SQL string

Repository: spark
Updated Branches:
  refs/heads/master 91984978e -> 1d1de28a3


[SPARK-13827][SQL] Can't add subquery to an operator with same-name outputs while generate SQL string

## What changes were proposed in this pull request?

This PR tries to solve a fundamental issue in the `SQLBuilder`. When we want to turn a logical plan into SQL string and put it after FROM clause, we need to wrap it with a sub-query. However, a logical plan is allowed to have same-name outputs with different qualifiers(e.g. the `Join` operator), and this kind of plan can't be put under a subquery as we will erase and assign a new qualifier to all outputs and make it impossible to distinguish same-name outputs.

To solve this problem, this PR renames all attributes with globally unique names(using exprId), so that we don't need qualifiers to resolve ambiguity anymore.

For example, `SELECT x.key, MAX(y.key) OVER () FROM t x JOIN t y`, we will parse this SQL to a Window operator and a Project operator, and add a sub-query between them. The generated SQL looks like:
```
SELECT sq_1.key, sq_1.max
FROM (
    SELECT sq_0.key, sq_0.key, MAX(sq_0.key) OVER () AS max
    FROM (
        SELECT x.key, y.key FROM t1 AS x JOIN t2 AS y
    ) AS sq_0
) AS sq_1
```
You can see, the `key` columns become ambiguous after `sq_0`.

After this PR, it will generate something like:
```
SELECT attr_30 AS key, attr_37 AS max
FROM (
    SELECT attr_30, attr_37
    FROM (
        SELECT attr_30, attr_35, MAX(attr_35) AS attr_37
        FROM (
            SELECT attr_30, attr_35 FROM
                (SELECT key AS attr_30 FROM t1) AS sq_0
            INNER JOIN
                (SELECT key AS attr_35 FROM t1) AS sq_1
        ) AS sq_2
    ) AS sq_3
) AS sq_4
```
The outermost SELECT is used to turn the generated named to real names back, and the innermost SELECT is used to alias real columns to our generated names. Between them, there is no name ambiguity anymore.

## How was this patch tested?

existing tests and new tests in LogicalPlanToSQLSuite.

Author: Wenchen Fan <we...@databricks.com>

Closes #11658 from cloud-fan/gensql.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d1de28a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d1de28a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d1de28a

Branch: refs/heads/master
Commit: 1d1de28a3c3c3a4bc37dc7565b9178a712df493a
Parents: 9198497
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Mar 16 11:57:28 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 11:57:28 2016 -0700

----------------------------------------------------------------------
 .../catalyst/expressions/namedExpressions.scala |   6 +-
 .../org/apache/spark/sql/hive/SQLBuilder.scala  | 226 +++++++++++--------
 .../spark/sql/hive/LogicalPlanToSQLSuite.scala  |  18 ++
 3 files changed, 147 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 1af5437..271ef33 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -185,8 +185,7 @@ case class Alias(child: Expression, name: String)(
   override def sql: String = {
     val qualifiersString =
       if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
-    val aliasName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
-    s"${child.sql} AS $qualifiersString${quoteIdentifier(aliasName)}"
+    s"${child.sql} AS $qualifiersString${quoteIdentifier(name)}"
   }
 }
 
@@ -302,8 +301,7 @@ case class AttributeReference(
   override def sql: String = {
     val qualifiersString =
       if (qualifiers.isEmpty) "" else qualifiers.map(quoteIdentifier).mkString("", ".", ".")
-    val attrRefName = if (isGenerated) s"$name#${exprId.id}" else s"$name"
-    s"$qualifiersString${quoteIdentifier(attrRefName)}"
+    s"$qualifiersString${quoteIdentifier(name)}"
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
index 760335b..3bc8e9a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/SQLBuilder.scala
@@ -24,6 +24,7 @@ import scala.util.control.NonFatal
 import org.apache.spark.Logging
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.CollapseProject
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -54,8 +55,26 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
 
   def toSQL: String = {
     val canonicalizedPlan = Canonicalizer.execute(logicalPlan)
+    val outputNames = logicalPlan.output.map(_.name)
+    val qualifiers = logicalPlan.output.flatMap(_.qualifiers).distinct
+
+    // Keep the qualifier information by using it as sub-query name, if there is only one qualifier
+    // present.
+    val finalName = if (qualifiers.length == 1) {
+      qualifiers.head
+    } else {
+      SQLBuilder.newSubqueryName
+    }
+
+    // Canonicalizer will remove all naming information, we should add it back by adding an extra
+    // Project and alias the outputs.
+    val aliasedOutput = canonicalizedPlan.output.zip(outputNames).map {
+      case (attr, name) => Alias(attr.withQualifiers(Nil), name)()
+    }
+    val finalPlan = Project(aliasedOutput, SubqueryAlias(finalName, canonicalizedPlan))
+
     try {
-      val replaced = canonicalizedPlan.transformAllExpressions {
+      val replaced = finalPlan.transformAllExpressions {
         case e: SubqueryExpression =>
           SubqueryHolder(new SQLBuilder(e.query, sqlContext).toSQL)
         case e: NonSQLExpression =>
@@ -109,23 +128,6 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     case Limit(limitExpr, child) =>
       s"${toSQL(child)} LIMIT ${limitExpr.sql}"
 
-    case p: Sample if p.isTableSample =>
-      val fraction = math.min(100, math.max(0, (p.upperBound - p.lowerBound) * 100))
-      p.child match {
-        case m: MetastoreRelation =>
-          val aliasName = m.alias.getOrElse("")
-          build(
-            s"`${m.databaseName}`.`${m.tableName}`",
-            "TABLESAMPLE(" + fraction + " PERCENT)",
-            aliasName)
-        case s: SubqueryAlias =>
-          val aliasName = if (s.child.isInstanceOf[SubqueryAlias]) s.alias else ""
-          val plan = if (s.child.isInstanceOf[SubqueryAlias]) s.child else s
-          build(toSQL(plan), "TABLESAMPLE(" + fraction + " PERCENT)", aliasName)
-        case _ =>
-          build(toSQL(p.child), "TABLESAMPLE(" + fraction + " PERCENT)")
-      }
-
     case Filter(condition, child) =>
       val whereOrHaving = child match {
         case _: Aggregate => "HAVING"
@@ -147,18 +149,7 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     case p: Except =>
       build("(" + toSQL(p.left), ") EXCEPT (", toSQL(p.right) + ")")
 
-    case p: SubqueryAlias =>
-      p.child match {
-        // Persisted data source relation
-        case LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
-          s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
-        // Parentheses is not used for persisted data source relations
-        // e.g., select x.c1 from (t1) as x inner join (t1) as y on x.c1 = y.c1
-        case SubqueryAlias(_, _: LogicalRelation | _: MetastoreRelation) =>
-          build(toSQL(p.child), "AS", p.alias)
-        case _ =>
-          build("(" + toSQL(p.child) + ")", "AS", p.alias)
-      }
+    case p: SubqueryAlias => build("(" + toSQL(p.child) + ")", "AS", p.alias)
 
     case p: Join =>
       build(
@@ -168,11 +159,12 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         toSQL(p.right),
         p.condition.map(" ON " + _.sql).getOrElse(""))
 
-    case p: MetastoreRelation =>
-      build(
-        s"${quoteIdentifier(p.databaseName)}.${quoteIdentifier(p.tableName)}",
-        p.alias.map(a => s" AS ${quoteIdentifier(a)}").getOrElse("")
-      )
+    case SQLTable(database, table, _, sample) =>
+      val qualifiedName = s"${quoteIdentifier(database)}.${quoteIdentifier(table)}"
+      sample.map { case (lowerBound, upperBound) =>
+        val fraction = math.min(100, math.max(0, (upperBound - lowerBound) * 100))
+        qualifiedName + " TABLESAMPLE(" + fraction + " PERCENT)"
+      }.getOrElse(qualifiedName)
 
     case Sort(orders, _, RepartitionByExpression(partitionExprs, child, _))
         if orders.map(_.child) == partitionExprs =>
@@ -274,8 +266,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     val groupingSetSQL = "GROUPING SETS(" +
       groupingSet.map(e => s"(${e.map(_.sql).mkString(", ")})").mkString(", ") + ")"
 
-    val aggExprs = agg.aggregateExpressions.map { case expr =>
-      expr.transformDown {
+    val aggExprs = agg.aggregateExpressions.map { case aggExpr =>
+      val originalAggExpr = aggExpr.transformDown {
         // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert it back.
         case ar: AttributeReference if ar == gid => GroupingID(Nil)
         case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
@@ -286,6 +278,15 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
           val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
           groupByExprs.lift(idx).map(Grouping).getOrElse(a)
       }
+
+      originalAggExpr match {
+        // Ancestor operators may reference the output of this grouping set, and we use exprId to
+        // generate a unique name for each attribute, so we should make sure the transformed
+        // aggregate expression won't change the output, i.e. exprId and alias name should remain
+        // the same.
+        case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
+        case e => Alias(e, normalizedName(aggExpr))(exprId = aggExpr.exprId)
+      }
     }
 
     build(
@@ -308,6 +309,8 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
     )
   }
 
+  private def normalizedName(n: NamedExpression): String = "gen_attr_" + n.exprId.id
+
   object Canonicalizer extends RuleExecutor[LogicalPlan] {
     override protected def batches: Seq[Batch] = Seq(
       Batch("Collapse Project", FixedPoint(100),
@@ -316,31 +319,55 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         // `Aggregate`s.
         CollapseProject),
       Batch("Recover Scoping Info", Once,
-        // Used to handle other auxiliary `Project`s added by analyzer (e.g.
-        // `ResolveAggregateFunctions` rule)
-        AddSubquery,
-        // Previous rule will add extra sub-queries, this rule is used to re-propagate and update
-        // the qualifiers bottom up, e.g.:
-        //
-        // Sort
-        //   ordering = t1.a
-        //   Project
-        //     projectList = [t1.a, t1.b]
-        //     Subquery gen_subquery
-        //       child ...
-        //
-        // will be transformed to:
-        //
-        // Sort
-        //   ordering = gen_subquery.a
-        //   Project
-        //     projectList = [gen_subquery.a, gen_subquery.b]
-        //     Subquery gen_subquery
-        //       child ...
-        UpdateQualifiers
+        // Remove all sub queries, as we will insert new ones when it's necessary.
+        EliminateSubqueryAliases,
+        // A logical plan is allowed to have same-name outputs with different qualifiers(e.g. the
+        // `Join` operator). However, this kind of plan can't be put under a sub query as we will
+        // erase and assign a new qualifier to all outputs and make it impossible to distinguish
+        // same-name outputs. This rule renames all attributes, to guarantee different
+        // attributes(with different exprId) always have different names. It also removes all
+        // qualifiers, as attributes have unique names now and we don't need qualifiers to resolve
+        // ambiguity.
+        NormalizedAttribute,
+        // Finds the table relations and wrap them with `SQLTable`s.  If there are any `Sample`
+        // operators on top of a table relation, merge the sample information into `SQLTable` of
+        // that table relation, as we can only convert table sample to standard SQL string.
+        ResolveSQLTable,
+        // Insert sub queries on top of operators that need to appear after FROM clause.
+        AddSubquery
       )
     )
 
+    object NormalizedAttribute extends Rule[LogicalPlan] {
+      override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions {
+        case a: AttributeReference =>
+          AttributeReference(normalizedName(a), a.dataType)(exprId = a.exprId, qualifiers = Nil)
+        case a: Alias =>
+          Alias(a.child, normalizedName(a))(exprId = a.exprId, qualifiers = Nil)
+      }
+    }
+
+    object ResolveSQLTable extends Rule[LogicalPlan] {
+      override def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown {
+        case Sample(lowerBound, upperBound, _, _, ExtractSQLTable(table)) =>
+          aliasColumns(table.withSample(lowerBound, upperBound))
+        case ExtractSQLTable(table) =>
+          aliasColumns(table)
+      }
+
+      /**
+       * Aliases the table columns to the generated attribute names, as we use exprId to generate
+       * unique name for each attribute when normalize attributes, and we can't reference table
+       * columns with their real names.
+       */
+      private def aliasColumns(table: SQLTable): LogicalPlan = {
+        val aliasedOutput = table.output.map { attr =>
+          Alias(attr, normalizedName(attr))(exprId = attr.exprId)
+        }
+        addSubquery(Project(aliasedOutput, table))
+      }
+    }
+
     object AddSubquery extends Rule[LogicalPlan] {
       override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
         // This branch handles aggregate functions within HAVING clauses.  For example:
@@ -354,55 +381,56 @@ class SQLBuilder(logicalPlan: LogicalPlan, sqlContext: SQLContext) extends Loggi
         //    +- Filter ...
         //        +- Aggregate ...
         //            +- MetastoreRelation default, src, None
-        case plan @ Project(_, Filter(_, _: Aggregate)) => wrapChildWithSubquery(plan)
+        case p @ Project(_, f @ Filter(_, _: Aggregate)) => p.copy(child = addSubquery(f))
 
-        case w @ Window(_, _, _, Filter(_, _: Aggregate)) => wrapChildWithSubquery(w)
+        case w @ Window(_, _, _, f @ Filter(_, _: Aggregate)) => w.copy(child = addSubquery(f))
 
-        case plan @ Project(_,
-          _: SubqueryAlias
-            | _: Filter
-            | _: Join
-            | _: MetastoreRelation
-            | OneRowRelation
-            | _: LocalLimit
-            | _: GlobalLimit
-            | _: Sample
-        ) => plan
-
-        case plan: Project => wrapChildWithSubquery(plan)
+        case p: Project => p.copy(child = addSubqueryIfNeeded(p.child))
 
         // We will generate "SELECT ... FROM ..." for Window operator, so its child operator should
         // be able to put in the FROM clause, or we wrap it with a subquery.
-        case w @ Window(_, _, _,
-          _: SubqueryAlias
-            | _: Filter
-            | _: Join
-            | _: MetastoreRelation
-            | OneRowRelation
-            | _: LocalLimit
-            | _: GlobalLimit
-            | _: Sample
-        ) => w
-
-        case w: Window => wrapChildWithSubquery(w)
-      }
+        case w: Window => w.copy(child = addSubqueryIfNeeded(w.child))
 
-      private def wrapChildWithSubquery(plan: UnaryNode): LogicalPlan = {
-        val newChild = SubqueryAlias(SQLBuilder.newSubqueryName, plan.child)
-        plan.withNewChildren(Seq(newChild))
+        case j: Join => j.copy(
+          left = addSubqueryIfNeeded(j.left),
+          right = addSubqueryIfNeeded(j.right))
       }
     }
 
-    object UpdateQualifiers extends Rule[LogicalPlan] {
-      override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp {
-        case plan =>
-          val inputAttributes = plan.children.flatMap(_.output)
-          plan transformExpressions {
-            case a: AttributeReference if !plan.producedAttributes.contains(a) =>
-              val qualifier = inputAttributes.find(_ semanticEquals a).map(_.qualifiers)
-              a.withQualifiers(qualifier.getOrElse(Nil))
-          }
-      }
+    private def addSubquery(plan: LogicalPlan): SubqueryAlias = {
+      SubqueryAlias(SQLBuilder.newSubqueryName, plan)
+    }
+
+    private def addSubqueryIfNeeded(plan: LogicalPlan): LogicalPlan = plan match {
+      case _: SubqueryAlias => plan
+      case _: Filter => plan
+      case _: Join => plan
+      case _: LocalLimit => plan
+      case _: GlobalLimit => plan
+      case _: SQLTable => plan
+      case OneRowRelation => plan
+      case _ => addSubquery(plan)
+    }
+  }
+
+  case class SQLTable(
+      database: String,
+      table: String,
+      output: Seq[Attribute],
+      sample: Option[(Double, Double)] = None) extends LeafNode {
+    def withSample(lowerBound: Double, upperBound: Double): SQLTable =
+      this.copy(sample = Some(lowerBound -> upperBound))
+  }
+
+  object ExtractSQLTable {
+    def unapply(plan: LogicalPlan): Option[SQLTable] = plan match {
+      case l @ LogicalRelation(_, _, Some(TableIdentifier(table, Some(database)))) =>
+        Some(SQLTable(database, table, l.output.map(_.withQualifiers(Nil))))
+
+      case m: MetastoreRelation =>
+        Some(SQLTable(m.databaseName, m.tableName, m.output.map(_.withQualifiers(Nil))))
+
+      case _ => None
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/1d1de28a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
index 198652b..f02ecb4 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/LogicalPlanToSQLSuite.scala
@@ -550,4 +550,22 @@ class LogicalPlanToSQLSuite extends SQLBuilderTest with SQLTestUtils {
         |WINDOW w AS (PARTITION BY key % 5 ORDER BY key)
       """.stripMargin)
   }
+
+  test("window with join") {
+    checkHiveQl(
+      """
+        |SELECT x.key, MAX(y.key) OVER (PARTITION BY x.key % 5 ORDER BY x.key)
+        |FROM parquet_t1 x JOIN parquet_t1 y ON x.key = y.key
+      """.stripMargin)
+  }
+
+  test("join 2 tables and aggregate function in having clause") {
+    checkHiveQl(
+      """
+        |SELECT COUNT(a.value), b.KEY, a.KEY
+        |FROM parquet_t1 a, parquet_t1 b
+        |GROUP BY a.KEY, b.KEY
+        |HAVING MAX(a.KEY) > 0
+      """.stripMargin)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org