You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/04 07:00:11 UTC

spark git commit: [SPARK-25573] Combine resolveExpression and resolve in the Analyzer

Repository: spark
Updated Branches:
  refs/heads/master f7af4a196 -> b4dea313c


[SPARK-25573] Combine resolveExpression and resolve in the Analyzer

## What changes were proposed in this pull request?
Currently in the Analyzer, we have two methods 1) Resolve 2)ResolveExpressions that are called at different code paths to resolve attributes, column ordinal and extract value expressions. ~~In this PR, we combine the two into one method to make sure, there is only one method that is tasked with resolving the attributes.~~
Update the description of the methods and use better names to make it easier to know when to make use of one method vs the other.

## How was this patch tested?
Existing tests.

Closes #22899 from dilipbiswal/SPARK-25573-final.

Authored-by: Dilip Biswal <db...@us.ibm.com>
Signed-off-by: gatorsmile <ga...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b4dea313
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b4dea313
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b4dea313

Branch: refs/heads/master
Commit: b4dea313c45042e4094d14ebdeb8ad27be4cc695
Parents: f7af4a1
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Mon Dec 3 23:00:02 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Dec 3 23:00:02 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  | 97 +++++++++++++-------
 1 file changed, 66 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b4dea313/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index b977fa0..7770531 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -883,21 +883,38 @@ class Analyzer(
       }
     }
 
-    private def resolve(e: Expression, q: LogicalPlan): Expression = e match {
-      case f: LambdaFunction if !f.bound => f
-      case u @ UnresolvedAttribute(nameParts) =>
-        // Leave unchanged if resolution fails. Hopefully will be resolved next round.
-        val result =
-          withPosition(u) {
-            q.resolveChildren(nameParts, resolver)
-              .orElse(resolveLiteralFunction(nameParts, u, q))
-              .getOrElse(u)
-          }
-        logDebug(s"Resolving $u to $result")
-        result
-      case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
-        ExtractValue(child, fieldExpr, resolver)
-      case _ => e.mapChildren(resolve(_, q))
+    /**
+     * Resolves the attribute and extract value expressions(s) by traversing the
+     * input expression in top down manner. The traversal is done in top-down manner as
+     * we need to skip over unbound lamda function expression. The lamda expressions are
+     * resolved in a different rule [[ResolveLambdaVariables]]
+     *
+     * Example :
+     * SELECT transform(array(1, 2, 3), (x, i) -> x + i)"
+     *
+     * In the case above, x and i are resolved as lamda variables in [[ResolveLambdaVariables]]
+     *
+     * Note : In this routine, the unresolved attributes are resolved from the input plan's
+     * children attributes.
+     */
+    private def resolveExpressionTopDown(e: Expression, q: LogicalPlan): Expression = {
+      if (e.resolved) return e
+      e match {
+        case f: LambdaFunction if !f.bound => f
+        case u @ UnresolvedAttribute(nameParts) =>
+          // Leave unchanged if resolution fails. Hopefully will be resolved next round.
+          val result =
+            withPosition(u) {
+              q.resolveChildren(nameParts, resolver)
+                .orElse(resolveLiteralFunction(nameParts, u, q))
+                .getOrElse(u)
+            }
+          logDebug(s"Resolving $u to $result")
+          result
+        case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
+          ExtractValue(child, fieldExpr, resolver)
+        case _ => e.mapChildren(resolveExpressionTopDown(_, q))
+      }
     }
 
     def apply(plan: LogicalPlan): LogicalPlan = plan.resolveOperatorsUp {
@@ -936,7 +953,7 @@ class Analyzer(
       // we still have chance to resolve it based on its descendants
       case s @ Sort(ordering, global, child) if child.resolved && !s.resolved =>
         val newOrdering =
-          ordering.map(order => resolveExpression(order, child).asInstanceOf[SortOrder])
+          ordering.map(order => resolveExpressionBottomUp(order, child).asInstanceOf[SortOrder])
         Sort(newOrdering, global, child)
 
       // A special case for Generate, because the output of Generate should not be resolved by
@@ -944,7 +961,7 @@ class Analyzer(
       case g @ Generate(generator, _, _, _, _, _) if generator.resolved => g
 
       case g @ Generate(generator, join, outer, qualifier, output, child) =>
-        val newG = resolveExpression(generator, child, throws = true)
+        val newG = resolveExpressionBottomUp(generator, child, throws = true)
         if (newG.fastEquals(generator)) {
           g
         } else {
@@ -959,11 +976,11 @@ class Analyzer(
       // `AppendColumns`, because `AppendColumns`'s serializer might produce conflict attribute
       // names leading to ambiguous references exception.
       case a @ Aggregate(groupingExprs, aggExprs, appendColumns: AppendColumns) =>
-        a.mapExpressions(resolve(_, appendColumns))
+        a.mapExpressions(resolveExpressionTopDown(_, appendColumns))
 
       case q: LogicalPlan =>
         logTrace(s"Attempting to resolve ${q.simpleString}")
-        q.mapExpressions(resolve(_, q))
+        q.mapExpressions(resolveExpressionTopDown(_, q))
     }
 
     def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = {
@@ -1060,7 +1077,22 @@ class Analyzer(
     func.map(wrapper)
   }
 
-  protected[sql] def resolveExpression(
+  /**
+   * Resolves the attribute, column value and extract value expressions(s) by traversing the
+   * input expression in bottom-up manner. In order to resolve the nested complex type fields
+   * correctly, this function makes use of `throws` parameter to control when to raise an
+   * AnalysisException.
+   *
+   * Example :
+   * SELECT a.b FROM t ORDER BY b[0].d
+   *
+   * In the above example, in b needs to be resolved before d can be resolved. Given we are
+   * doing a bottom up traversal, it will first attempt to resolve d and fail as b has not
+   * been resolved yet. If `throws` is false, this function will handle the exception by
+   * returning the original attribute. In this case `d` will be resolved in subsequent passes
+   * after `b` is resolved.
+   */
+  protected[sql] def resolveExpressionBottomUp(
       expr: Expression,
       plan: LogicalPlan,
       throws: Boolean = false): Expression = {
@@ -1073,11 +1105,14 @@ class Analyzer(
       expr transformUp {
         case GetColumnByOrdinal(ordinal, _) => plan.output(ordinal)
         case u @ UnresolvedAttribute(nameParts) =>
-          withPosition(u) {
-            plan.resolve(nameParts, resolver)
-              .orElse(resolveLiteralFunction(nameParts, u, plan))
-              .getOrElse(u)
-          }
+          val result =
+            withPosition(u) {
+              plan.resolve(nameParts, resolver)
+                .orElse(resolveLiteralFunction(nameParts, u, plan))
+                .getOrElse(u)
+            }
+          logDebug(s"Resolving $u to $result")
+          result
         case UnresolvedExtractValue(child, fieldName) if child.resolved =>
           ExtractValue(child, fieldName, resolver)
       }
@@ -1223,7 +1258,7 @@ class Analyzer(
         plan match {
           case p: Project =>
             // Resolving expressions against current plan.
-            val maybeResolvedExprs = exprs.map(resolveExpression(_, p))
+            val maybeResolvedExprs = exprs.map(resolveExpressionBottomUp(_, p))
             // Recursively resolving expressions on the child of current plan.
             val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, p.child)
             // If some attributes used by expressions are resolvable only on the rewritten child
@@ -1232,7 +1267,7 @@ class Analyzer(
             (newExprs, Project(p.projectList ++ missingAttrs, newChild))
 
           case a @ Aggregate(groupExprs, aggExprs, child) =>
-            val maybeResolvedExprs = exprs.map(resolveExpression(_, a))
+            val maybeResolvedExprs = exprs.map(resolveExpressionBottomUp(_, a))
             val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, child)
             val missingAttrs = (AttributeSet(newExprs) -- a.outputSet).intersect(newChild.outputSet)
             if (missingAttrs.forall(attr => groupExprs.exists(_.semanticEquals(attr)))) {
@@ -1244,20 +1279,20 @@ class Analyzer(
             }
 
           case g: Generate =>
-            val maybeResolvedExprs = exprs.map(resolveExpression(_, g))
+            val maybeResolvedExprs = exprs.map(resolveExpressionBottomUp(_, g))
             val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, g.child)
             (newExprs, g.copy(unrequiredChildIndex = Nil, child = newChild))
 
           // For `Distinct` and `SubqueryAlias`, we can't recursively resolve and add attributes
           // via its children.
           case u: UnaryNode if !u.isInstanceOf[Distinct] && !u.isInstanceOf[SubqueryAlias] =>
-            val maybeResolvedExprs = exprs.map(resolveExpression(_, u))
+            val maybeResolvedExprs = exprs.map(resolveExpressionBottomUp(_, u))
             val (newExprs, newChild) = resolveExprsAndAddMissingAttrs(maybeResolvedExprs, u.child)
             (newExprs, u.withNewChildren(Seq(newChild)))
 
           // For other operators, we can't recursively resolve and add attributes via its children.
           case other =>
-            (exprs.map(resolveExpression(_, other)), other)
+            (exprs.map(resolveExpressionBottomUp(_, other)), other)
         }
       }
     }
@@ -2387,7 +2422,7 @@ class Analyzer(
           }
 
           validateTopLevelTupleFields(deserializer, inputs)
-          val resolved = resolveExpression(
+          val resolved = resolveExpressionBottomUp(
             deserializer, LocalRelation(inputs), throws = true)
           val result = resolved transformDown {
             case UnresolvedMapObjects(func, inputData, cls) if inputData.resolved =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org