You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/12 00:26:34 UTC

spark git commit: [SPARK-3688][SQL] More inline comments for LogicalPlan.

Repository: spark
Updated Branches:
  refs/heads/master 44b2311d9 -> fa6bdc6e8


[SPARK-3688][SQL] More inline comments for LogicalPlan.

As a follow-up to https://github.com/apache/spark/pull/4524

Author: Reynold Xin <rx...@databricks.com>

Closes #4539 from rxin/SPARK-3688 and squashes the following commits:

5ac56c7 [Reynold Xin] exists
da8eea4 [Reynold Xin] [SPARK-3688][SQL] More inline comments for LogicalPlan.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa6bdc6e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa6bdc6e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa6bdc6e

Branch: refs/heads/master
Commit: fa6bdc6e819f9338248b952ec578bcd791ddbf6d
Parents: 44b2311
Author: Reynold Xin <rx...@databricks.com>
Authored: Wed Feb 11 15:26:31 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Wed Feb 11 15:26:31 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/package.scala   |   4 +-
 .../catalyst/expressions/namedExpressions.scala |  11 ++
 .../spark/sql/catalyst/plans/QueryPlan.scala    |   5 +
 .../catalyst/plans/logical/LogicalPlan.scala    | 102 +++++++++++--------
 .../sql/catalyst/plans/logical/Statistics.scala |  35 +++++++
 5 files changed, 115 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fa6bdc6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
index 3f672a3..5dc9d0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/package.scala
@@ -25,8 +25,8 @@ package org.apache.spark.sql.catalyst
 package object analysis {
 
   /**
-   * Responsible for resolving which identifiers refer to the same entity.  For example, by using
-   * case insensitive equality.
+   * Resolver should return true if the first string refers to the same entity as the second string.
+   * For example, by using case insensitive equality.
    */
   type Resolver = (String, String) => Boolean
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fa6bdc6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index 7f122e9..f77c563 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -40,6 +40,17 @@ abstract class NamedExpression extends Expression {
 
   def name: String
   def exprId: ExprId
+
+  /**
+   * All possible qualifiers for the expression.
+   *
+   * For now, since we do not allow using original table name to qualify a column name once the
+   * table is aliased, this can only be:
+   *
+   * 1. Empty Seq: when an attribute doesn't have a qualifier,
+   *    e.g. top level attributes aliased in the SELECT clause, or column from a LocalRelation.
+   * 2. Single element: either the table name or the alias name of the table.
+   */
   def qualifiers: Seq[String]
 
   def toAttribute: Attribute

http://git-wip-us.apache.org/repos/asf/spark/blob/fa6bdc6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 619f428..17a88e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -152,6 +152,11 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
   /** Prints out the schema in the tree format */
   def printSchema(): Unit = println(schemaString)
 
+  /**
+   * A prefix string used when printing the plan.
+   *
+   * We use "!" to indicate an invalid plan, and "'" to indicate an unresolved plan.
+   */
   protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
 
   override def simpleString = statePrefix + super.simpleString

http://git-wip-us.apache.org/repos/asf/spark/blob/fa6bdc6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index b23f8d0..8c4f09b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -18,41 +18,29 @@
 package org.apache.spark.sql.catalyst.plans.logical
 
 import org.apache.spark.Logging
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{UnresolvedGetField, Resolver}
-import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
-import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.catalyst.trees
 
-/**
- * Estimates of various statistics.  The default estimation logic simply lazily multiplies the
- * corresponding statistic produced by the children.  To override this behavior, override
- * `statistics` and assign it an overridden version of `Statistics`.
- *
- * '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
- * performance of the implementations.  The reason is that estimations might get triggered in
- * performance-critical processes, such as query plan planning.
- *
- * Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
- * cardinality estimation (e.g. cartesian joins).
- *
- * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
- *                    defaults to the product of children's `sizeInBytes`.
- */
-private[sql] case class Statistics(sizeInBytes: BigInt)
 
 abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   self: Product =>
 
+  /**
+   * Computes [[Statistics]] for this plan. The default implementation assumes the output
+   * cardinality is the product of of all child plan's cardinality, i.e. applies in the case
+   * of cartesian joins.
+   *
+   * [[LeafNode]]s must override this.
+   */
   def statistics: Statistics = {
     if (children.size == 0) {
       throw new UnsupportedOperationException(s"LeafNode $nodeName must implement statistics.")
     }
-
-    Statistics(
-      sizeInBytes = children.map(_.statistics).map(_.sizeInBytes).product)
+    Statistics(sizeInBytes = children.map(_.statistics.sizeInBytes).product)
   }
 
   /**
@@ -128,26 +116,41 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   def resolve(name: String, resolver: Resolver): Option[NamedExpression] =
     resolve(name, output, resolver)
 
-  def resolveAsTableColumn(
+  /**
+   * Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
+   *
+   * This assumes `name` has multiple parts, where the 1st part is a qualifier
+   * (i.e. table name, alias, or subquery alias).
+   * See the comment above `candidates` variable in resolve() for semantics the returned data.
+   */
+  private def resolveAsTableColumn(
       nameParts: Array[String],
       resolver: Resolver,
-      attribute: Attribute): List[(Attribute, List[String])] = {
-    if (attribute.qualifiers.find(resolver(_, nameParts.head)).nonEmpty && nameParts.size > 1) {
-      val remainingParts = nameParts.drop(1)
+      attribute: Attribute): Option[(Attribute, List[String])] = {
+    assert(nameParts.length > 1)
+    if (attribute.qualifiers.exists(resolver(_, nameParts.head))) {
+      // At least one qualifier matches. See if remaining parts match.
+      val remainingParts = nameParts.tail
       resolveAsColumn(remainingParts, resolver, attribute)
     } else {
-      Nil
+      None
     }
   }
 
-  def resolveAsColumn(
+  /**
+   * Resolve the given `name` string against the given attribute, returning either 0 or 1 match.
+   *
+   * Different from resolveAsTableColumn, this assumes `name` does NOT start with a qualifier.
+   * See the comment above `candidates` variable in resolve() for semantics the returned data.
+   */
+  private def resolveAsColumn(
       nameParts: Array[String],
       resolver: Resolver,
-      attribute: Attribute): List[(Attribute, List[String])] = {
+      attribute: Attribute): Option[(Attribute, List[String])] = {
     if (resolver(attribute.name, nameParts.head)) {
-      (attribute.withName(nameParts.head), nameParts.tail.toList) :: Nil
+      Option((attribute.withName(nameParts.head), nameParts.tail.toList))
     } else {
-      Nil
+      None
     }
   }
 
@@ -159,25 +162,44 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
 
     val parts = name.split("\\.")
 
-    // We will try to resolve this name as `table.column` pattern first.
-    var options = input.flatMap { option =>
-      resolveAsTableColumn(parts, resolver, option)
+    // A sequence of possible candidate matches.
+    // Each candidate is a tuple. The first element is a resolved attribute, followed by a list
+    // of parts that are to be resolved.
+    // For example, consider an example where "a" is the table name, "b" is the column name,
+    // and "c" is the struct field name, i.e. "a.b.c". In this case, Attribute will be "a.b",
+    // and the second element will be List("c").
+    var candidates: Seq[(Attribute, List[String])] = {
+      // If the name has 2 or more parts, try to resolve it as `table.column` first.
+      if (parts.length > 1) {
+        input.flatMap { option =>
+          resolveAsTableColumn(parts, resolver, option)
+        }
+      } else {
+        Seq.empty
+      }
     }
 
     // If none of attributes match `table.column` pattern, we try to resolve it as a column.
-    if(options.isEmpty) {
-      options = input.flatMap { option =>
-        resolveAsColumn(parts, resolver, option)
+    if (candidates.isEmpty) {
+      candidates = input.flatMap { candidate =>
+        resolveAsColumn(parts, resolver, candidate)
       }
     }
 
-    options.distinct match {
+    candidates.distinct match {
       // One match, no nested fields, use it.
       case Seq((a, Nil)) => Some(a)
 
       // One match, but we also need to extract the requested nested field.
       case Seq((a, nestedFields)) =>
-        Some(Alias(nestedFields.foldLeft(a: Expression)(UnresolvedGetField), nestedFields.last)())
+        // The foldLeft adds UnresolvedGetField for every remaining parts of the name,
+        // and aliased it with the last part of the name.
+        // For example, consider name "a.b.c", where "a" is resolved to an existing attribute.
+        // Then this will add UnresolvedGetField("b") and UnresolvedGetField("c"), and alias
+        // the final expression as "c".
+        val fieldExprs = nestedFields.foldLeft(a: Expression)(UnresolvedGetField)
+        val aliasName = nestedFields.last
+        Some(Alias(fieldExprs, aliasName)())
 
       // No matches.
       case Seq() =>
@@ -186,8 +208,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
 
       // More than one match.
       case ambiguousReferences =>
-        throw new TreeNodeException(
-          this, s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
+        throw new AnalysisException(
+          s"Ambiguous references to $name: ${ambiguousReferences.mkString(",")}")
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fa6bdc6e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
new file mode 100644
index 0000000..9ac4c3a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.plans.logical
+
+/**
+ * Estimates of various statistics.  The default estimation logic simply lazily multiplies the
+ * corresponding statistic produced by the children.  To override this behavior, override
+ * `statistics` and assign it an overridden version of `Statistics`.
+ *
+ * '''NOTE''': concrete and/or overridden versions of statistics fields should pay attention to the
+ * performance of the implementations.  The reason is that estimations might get triggered in
+ * performance-critical processes, such as query plan planning.
+ *
+ * Note that we are using a BigInt here since it is easy to overflow a 64-bit integer in
+ * cardinality estimation (e.g. cartesian joins).
+ *
+ * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
+ *                    defaults to the product of children's `sizeInBytes`.
+ */
+private[sql] case class Statistics(sizeInBytes: BigInt)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org