You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/13 22:46:42 UTC

git commit: [SQL] Add type checking debugging functions

Repository: spark
Updated Branches:
  refs/heads/master e10d71e7e -> 371321cad


[SQL] Add type checking debugging functions

Adds some functions that were very useful when trying to track down the bug from #2656.  This change also changes the tree output for query plans to include the `'` prefix to unresolved nodes and `!` prefix to nodes that refer to non-existent attributes.

Author: Michael Armbrust <mi...@databricks.com>

Closes #2657 from marmbrus/debugging and squashes the following commits:

654b926 [Michael Armbrust] Clean-up, add tests
763af15 [Michael Armbrust] Add typeChecking debugging functions
8c69303 [Michael Armbrust] Add inputSet, references to QueryPlan. Improve tree string with a prefix to denote invalid or unresolved nodes.
fbeab54 [Michael Armbrust] Better toString, factories for AttributeSet.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/371321ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/371321ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/371321ca

Branch: refs/heads/master
Commit: 371321cadee7df39258bd374eb59c1e32451d96b
Parents: e10d71e
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Oct 13 13:46:34 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Oct 13 13:46:34 2014 -0700

----------------------------------------------------------------------
 .../sql/catalyst/expressions/AttributeSet.scala | 23 ++++--
 .../sql/catalyst/expressions/Projection.scala   |  2 +
 .../catalyst/expressions/namedExpressions.scala |  4 +-
 .../spark/sql/catalyst/plans/QueryPlan.scala    | 23 ++++++
 .../catalyst/plans/logical/LogicalPlan.scala    |  8 +-
 .../catalyst/plans/logical/basicOperators.scala |  5 --
 .../spark/sql/execution/debug/package.scala     | 85 ++++++++++++++++++++
 .../sql/execution/debug/DebuggingSuite.scala    | 33 ++++++++
 8 files changed, 163 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index c3a08bb..2b4969b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -17,19 +17,26 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.analysis.Star
+
 protected class AttributeEquals(val a: Attribute) {
   override def hashCode() = a.exprId.hashCode()
-  override def equals(other: Any) = other match {
-    case otherReference: AttributeEquals => a.exprId == otherReference.a.exprId
-    case otherAttribute => false
+  override def equals(other: Any) = (a, other.asInstanceOf[AttributeEquals].a) match {
+    case (a1: AttributeReference, a2: AttributeReference) => a1.exprId == a2.exprId
+    case (a1, a2) => a1 == a2
   }
 }
 
 object AttributeSet {
-  /** Constructs a new [[AttributeSet]] given a sequence of [[Attribute Attributes]]. */
-  def apply(baseSet: Seq[Attribute]) = {
-    new AttributeSet(baseSet.map(new AttributeEquals(_)).toSet)
-  }
+  def apply(a: Attribute) =
+    new AttributeSet(Set(new AttributeEquals(a)))
+
+  /** Constructs a new [[AttributeSet]] given a sequence of [[Expression Expressions]]. */
+  def apply(baseSet: Seq[Expression]) =
+    new AttributeSet(
+      baseSet
+        .flatMap(_.references)
+        .map(new AttributeEquals(_)).toSet)
 }
 
 /**
@@ -103,4 +110,6 @@ class AttributeSet private (val baseSet: Set[AttributeEquals])
   // We must force toSeq to not be strict otherwise we end up with a [[Stream]] that captures all
   // sorts of things in its closure.
   override def toSeq: Seq[Attribute] = baseSet.map(_.a).toArray.toSeq
+
+  override def toString = "{" + baseSet.map(_.a).mkString(", ") + "}"
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
index 204904e..e7e81a2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Projection.scala
@@ -39,6 +39,8 @@ class InterpretedProjection(expressions: Seq[Expression]) extends Projection {
     }
     new GenericRow(outputArray)
   }
+
+  override def toString = s"Row => [${exprArray.mkString(",")}]"
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
index e5a958d..d023db4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala
@@ -57,6 +57,8 @@ abstract class NamedExpression extends Expression {
 abstract class Attribute extends NamedExpression {
   self: Product =>
 
+  override def references = AttributeSet(this)
+
   def withNullability(newNullability: Boolean): Attribute
   def withQualifiers(newQualifiers: Seq[String]): Attribute
   def withName(newName: String): Attribute
@@ -116,8 +118,6 @@ case class AttributeReference(name: String, dataType: DataType, nullable: Boolea
     (val exprId: ExprId = NamedExpression.newExprId, val qualifiers: Seq[String] = Nil)
   extends Attribute with trees.LeafNode[Expression] {
 
-  override def references = AttributeSet(this :: Nil)
-
   override def equals(other: Any) = other match {
     case ar: AttributeReference => exprId == ar.exprId && dataType == ar.dataType
     case _ => false

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index af9e4d8..dcbbb62 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -32,6 +32,25 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
   def outputSet: AttributeSet = AttributeSet(output)
 
   /**
+   * All Attributes that appear in expressions from this operator.  Note that this set does not
+   * include attributes that are implicitly referenced by being passed through to the output tuple.
+   */
+  def references: AttributeSet = AttributeSet(expressions.flatMap(_.references))
+
+  /**
+   * The set of all attributes that are input to this operator by its children.
+   */
+  def inputSet: AttributeSet =
+    AttributeSet(children.flatMap(_.asInstanceOf[QueryPlan[PlanType]].output))
+
+  /**
+   * Attributes that are referenced by expressions but not provided by this nodes children.
+   * Subclasses should override this method if they produce attributes internally as it is used by
+   * assertions designed to prevent the construction of invalid plans.
+   */
+  def missingInput: AttributeSet = references -- inputSet
+
+  /**
    * Runs [[transform]] with `rule` on all expressions present in this query operator.
    * Users should not expect a specific directionality. If a specific directionality is needed,
    * transformExpressionsDown or transformExpressionsUp should be used.
@@ -132,4 +151,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
 
   /** Prints out the schema in the tree format */
   def printSchema(): Unit = println(schemaString)
+
+  protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""
+
+  override def simpleString = statePrefix + super.simpleString
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 4f8ad8a..882e9c6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -54,12 +54,6 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
   }
 
   /**
-   * Returns the set of attributes that this node takes as
-   * input from its children.
-   */
-  lazy val inputSet: AttributeSet = AttributeSet(children.flatMap(_.output))
-
-  /**
    * Returns true if this expression and all its children have been resolved to a specific schema
    * and false if it still contains any unresolved placeholders. Implementations of LogicalPlan
    * can override this (e.g.
@@ -68,6 +62,8 @@ abstract class LogicalPlan extends QueryPlan[LogicalPlan] with Logging {
    */
   lazy val resolved: Boolean = !expressions.exists(!_.resolved) && childrenResolved
 
+  override protected def statePrefix = if (!resolved) "'" else super.statePrefix
+
   /**
    * Returns true if all its children of this query plan have been resolved.
    */

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index f8e9930..14b03c7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -138,11 +138,6 @@ case class Aggregate(
     child: LogicalPlan)
   extends UnaryNode {
 
-  /** The set of all AttributeReferences required for this aggregation. */
-  def references =
-    AttributeSet(
-      groupingExpressions.flatMap(_.references) ++ aggregateExpressions.flatMap(_.references))
-
   override def output = aggregateExpressions.map(_.toAttribute)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
index a9535a7..61be5ed 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -24,6 +24,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.SparkContext._
 import org.apache.spark.sql.{SchemaRDD, Row}
 import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.types._
 
 /**
  * :: DeveloperApi ::
@@ -56,6 +57,23 @@ package object debug {
         case _ =>
       }
     }
+
+    def typeCheck(): Unit = {
+      val plan = query.queryExecution.executedPlan
+      val visited = new collection.mutable.HashSet[TreeNodeRef]()
+      val debugPlan = plan transform {
+        case s: SparkPlan if !visited.contains(new TreeNodeRef(s)) =>
+          visited += new TreeNodeRef(s)
+          TypeCheck(s)
+      }
+      try {
+        println(s"Results returned: ${debugPlan.execute().count()}")
+      } catch {
+        case e: Exception =>
+          def unwrap(e: Throwable): Throwable = if (e.getCause == null) e else unwrap(e.getCause)
+          println(s"Deepest Error: ${unwrap(e)}")
+      }
+    }
   }
 
   private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
@@ -115,4 +133,71 @@ package object debug {
       }
     }
   }
+
+  /**
+   * :: DeveloperApi ::
+   * Helper functions for checking that runtime types match a given schema.
+   */
+  @DeveloperApi
+  object TypeCheck {
+    def typeCheck(data: Any, schema: DataType): Unit = (data, schema) match {
+      case (null, _) =>
+
+      case (row: Row, StructType(fields)) =>
+        row.zip(fields.map(_.dataType)).foreach { case(d,t) => typeCheck(d,t) }
+      case (s: Seq[_], ArrayType(elemType, _)) =>
+        s.foreach(typeCheck(_, elemType))
+      case (m: Map[_, _], MapType(keyType, valueType, _)) =>
+        m.keys.foreach(typeCheck(_, keyType))
+        m.values.foreach(typeCheck(_, valueType))
+
+      case (_: Long, LongType) =>
+      case (_: Int, IntegerType) =>
+      case (_: String, StringType) =>
+      case (_: Float, FloatType) =>
+      case (_: Byte, ByteType) =>
+      case (_: Short, ShortType) =>
+      case (_: Boolean, BooleanType) =>
+      case (_: Double, DoubleType) =>
+
+      case (d, t) => sys.error(s"Invalid data found: got $d (${d.getClass}) expected $t")
+    }
+  }
+
+  /**
+   * :: DeveloperApi ::
+   * Augments SchemaRDDs with debug methods.
+   */
+  @DeveloperApi
+  private[sql] case class TypeCheck(child: SparkPlan) extends SparkPlan {
+    import TypeCheck._
+
+    override def nodeName  = ""
+
+    /* Only required when defining this class in a REPL.
+    override def makeCopy(args: Array[Object]): this.type =
+      TypeCheck(args(0).asInstanceOf[SparkPlan]).asInstanceOf[this.type]
+    */
+
+    def output = child.output
+
+    def children = child :: Nil
+
+    def execute() = {
+      child.execute().map { row =>
+        try typeCheck(row, child.schema) catch {
+          case e: Exception =>
+            sys.error(
+              s"""
+                  |ERROR WHEN TYPE CHECKING QUERY
+                  |==============================
+                  |$e
+                  |======== BAD TREE ============
+                  |$child
+             """.stripMargin)
+        }
+        row
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/371321ca/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
new file mode 100644
index 0000000..87c28c3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/debug/DebuggingSuite.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.debug
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.TestData._
+import org.apache.spark.sql.test.TestSQLContext._
+
+class DebuggingSuite extends FunSuite {
+  test("SchemaRDD.debug()") {
+    testData.debug()
+  }
+
+  test("SchemaRDD.typeCheck()") {
+    testData.typeCheck()
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org