You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/06/09 23:24:23 UTC

git commit: [SQL] Simple framework for debugging query execution

Repository: spark
Updated Branches:
  refs/heads/master e27344768 -> c6e041d17


[SQL] Simple framework for debugging query execution

Only records number of tuples and unique dataTypes output right now...

Example:
```scala
scala> import org.apache.spark.sql.execution.debug._
scala> hql("SELECT value FROM src WHERE key > 10").debug(sparkContext)

Results returned: 489
== Project [value#1:0] ==
Tuples output: 489
 value StringType: {java.lang.String}
== Filter (key#0:1 > 10) ==
Tuples output: 489
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
== HiveTableScan [value#1,key#0], (MetastoreRelation default, src, None), None ==
Tuples output: 500
 value StringType: {java.lang.String}
 key IntegerType: {java.lang.Integer}
```

Author: Michael Armbrust <mi...@databricks.com>

Closes #1005 from marmbrus/debug and squashes the following commits:

dcc3ca6 [Michael Armbrust] Add comments.
c9dded2 [Michael Armbrust] Simple framework for debugging query execution


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6e041d1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6e041d1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6e041d1

Branch: refs/heads/master
Commit: c6e041d171e3d9882ab15e2bd7a7217dc19647f6
Parents: e273447
Author: Michael Armbrust <mi...@databricks.com>
Authored: Mon Jun 9 14:24:19 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jun 9 14:24:19 2014 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLContext.scala |   5 -
 .../org/apache/spark/sql/execution/debug.scala  |  45 -------
 .../spark/sql/execution/debug/package.scala     | 119 +++++++++++++++++++
 3 files changed, 119 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6e041d1/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e371c82..5626f0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -285,11 +285,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
          |== Physical Plan ==
          |${stringOrError(executedPlan)}
       """.stripMargin.trim
-
-    /**
-     * Runs the query after interposing operators that print the result of each intermediate step.
-     */
-    def debugExec() = DebugQuery(executedPlan).execute().collect()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/c6e041d1/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala
deleted file mode 100644
index a0d2910..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug.scala
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution
-
-private[sql] object DebugQuery {
-  def apply(plan: SparkPlan): SparkPlan = {
-    val visited = new collection.mutable.HashSet[Long]()
-    plan transform {
-      case s: SparkPlan if !visited.contains(s.id) =>
-        visited += s.id
-        DebugNode(s)
-    }
-  }
-}
-
-private[sql] case class DebugNode(child: SparkPlan) extends UnaryNode {
-  def references = Set.empty
-  def output = child.output
-  def execute() = {
-    val childRdd = child.execute()
-    println(
-      s"""
-        |=========================
-        |${child.simpleString}
-        |=========================
-      """.stripMargin)
-    childRdd.foreach(println(_))
-    childRdd
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c6e041d1/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
new file mode 100644
index 0000000..c6fbd6d
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/debug/package.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import scala.collection.mutable.HashSet
+
+import org.apache.spark.{AccumulatorParam, Accumulator, SparkContext}
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.SparkContext._
+import org.apache.spark.sql.{SchemaRDD, Row}
+
+/**
+ * :: DeveloperApi ::
+ * Contains methods for debugging query execution.
+ *
+ * Usage:
+ * {{{
+ *   sql("SELECT key FROM src").debug
+ * }}}
+ */
+package object debug {
+
+  /**
+   * :: DeveloperApi ::
+   * Augments SchemaRDDs with debug methods.
+   */
+  @DeveloperApi
+  implicit class DebugQuery(query: SchemaRDD) {
+    def debug(implicit sc: SparkContext): Unit = {
+      val plan = query.queryExecution.executedPlan
+      val visited = new collection.mutable.HashSet[Long]()
+      val debugPlan = plan transform {
+        case s: SparkPlan if !visited.contains(s.id) =>
+          visited += s.id
+          DebugNode(sc, s)
+      }
+      println(s"Results returned: ${debugPlan.execute().count()}")
+      debugPlan.foreach {
+        case d: DebugNode => d.dumpStats()
+        case _ =>
+      }
+    }
+  }
+
+  private[sql] case class DebugNode(
+      @transient sparkContext: SparkContext,
+      child: SparkPlan) extends UnaryNode {
+    def references = Set.empty
+
+    def output = child.output
+
+    implicit object SetAccumulatorParam extends AccumulatorParam[HashSet[String]] {
+      def zero(initialValue: HashSet[String]): HashSet[String] = {
+        initialValue.clear()
+        initialValue
+      }
+
+      def addInPlace(v1: HashSet[String], v2: HashSet[String]): HashSet[String] = {
+        v1 ++= v2
+        v1
+      }
+    }
+
+    /**
+     * A collection of stats for each column of output.
+     * @param elementTypes the actual runtime types for the output.  Useful when there are bugs
+     *        causing the wrong data to be projected.
+     */
+    case class ColumnStat(
+        elementTypes: Accumulator[HashSet[String]] = sparkContext.accumulator(HashSet.empty))
+    val tupleCount = sparkContext.accumulator[Int](0)
+
+    val numColumns = child.output.size
+    val columnStats = Array.fill(child.output.size)(new ColumnStat())
+
+    def dumpStats(): Unit = {
+      println(s"== ${child.simpleString} ==")
+      println(s"Tuples output: ${tupleCount.value}")
+      child.output.zip(columnStats).foreach { case(attr, stat) =>
+        val actualDataTypes =stat.elementTypes.value.mkString("{", ",", "}")
+        println(s" ${attr.name} ${attr.dataType}: $actualDataTypes")
+      }
+    }
+
+    def execute() = {
+      child.execute().mapPartitions { iter =>
+        new Iterator[Row] {
+          def hasNext = iter.hasNext
+          def next() = {
+            val currentRow = iter.next()
+            tupleCount += 1
+            var i = 0
+            while (i < numColumns) {
+              val value = currentRow(i)
+              columnStats(i).elementTypes += HashSet(value.getClass.getName)
+              i += 1
+            }
+            currentRow
+          }
+        }
+      }
+    }
+  }
+}