You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/07/22 03:55:07 UTC

git commit: [SPARK-2086] Improve output of toDebugString to make shuffle boundaries more clear

Repository: spark
Updated Branches:
  refs/heads/master 511a73140 -> c3462c656


[SPARK-2086] Improve output of toDebugString to make shuffle boundaries more clear

Changes RDD.toDebugString() to show hierarchy and shuffle transformations more clearly

New output:

```
(3) FlatMappedValuesRDD[325] at apply at Transformer.scala:22
 |  MappedValuesRDD[324] at apply at Transformer.scala:22
 |  CoGroupedRDD[323] at apply at Transformer.scala:22
 +-(5) MappedRDD[320] at apply at Transformer.scala:22
 |  |  MappedRDD[319] at apply at Transformer.scala:22
 |  |  MappedValuesRDD[318] at apply at Transformer.scala:22
 |  |  MapPartitionsRDD[317] at apply at Transformer.scala:22
 |  |  ShuffledRDD[316] at apply at Transformer.scala:22
 |  +-(10) MappedRDD[315] at apply at Transformer.scala:22
 |     |   ParallelCollectionRDD[314] at apply at Transformer.scala:22
 +-(100) MappedRDD[322] at apply at Transformer.scala:22
     |   ParallelCollectionRDD[321] at apply at Transformer.scala:22
```

Author: Gregory Owen <gr...@gmail.com>

Closes #1364 from GregOwen/to-debug-string and squashes the following commits:

08f5c78 [Gregory Owen] toDebugString: prettier debug printing to show shuffles and joins more clearly
1603f7b [Gregory Owen] toDebugString: prettier debug printing to show shuffles and joins more clearly


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c3462c65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c3462c65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c3462c65

Branch: refs/heads/master
Commit: c3462c65684885299cf037d56c88bd53c08c6348
Parents: 511a731
Author: Gregory Owen <gr...@gmail.com>
Authored: Mon Jul 21 18:55:01 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jul 21 18:55:01 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   | 52 ++++++++++++++++++--
 project/MimaExcludes.scala                      |  8 +++
 2 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c3462c65/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 88a918a..a1f2827 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1269,11 +1269,55 @@ abstract class RDD[T: ClassTag](
 
   /** A description of this RDD and its recursive dependencies for debugging. */
   def toDebugString: String = {
-    def debugString(rdd: RDD[_], prefix: String = ""): Seq[String] = {
-      Seq(prefix + rdd + " (" + rdd.partitions.size + " partitions)") ++
-        rdd.dependencies.flatMap(d => debugString(d.rdd, prefix + "  "))
+    // Apply a different rule to the last child
+    def debugChildren(rdd: RDD[_], prefix: String): Seq[String] = {
+      val len = rdd.dependencies.length
+      len match {
+        case 0 => Seq.empty
+        case 1 =>
+          val d = rdd.dependencies.head
+          debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true)
+        case _ =>
+          val frontDeps = rdd.dependencies.take(len - 1)
+          val frontDepStrings = frontDeps.flatMap(
+            d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]]))
+
+          val lastDep = rdd.dependencies.last
+          val lastDepStrings =
+            debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true)
+
+          (frontDepStrings ++ lastDepStrings)
+      }
+    }
+    // The first RDD in the dependency stack has no parents, so no need for a +-
+    def firstDebugString(rdd: RDD[_]): Seq[String] = {
+      val partitionStr = "(" + rdd.partitions.size + ")"
+      val leftOffset = (partitionStr.length - 1) / 2
+      val nextPrefix = (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset))
+      Seq(partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+    }
+    def shuffleDebugString(rdd: RDD[_], prefix: String = "", isLastChild: Boolean): Seq[String] = {
+      val partitionStr = "(" + rdd.partitions.size + ")"
+      val leftOffset = (partitionStr.length - 1) / 2
+      val thisPrefix = prefix.replaceAll("\\|\\s+$", "")
+      val nextPrefix = (
+        thisPrefix
+        + (if (isLastChild) "  " else "| ")
+        + (" " * leftOffset) + "|" + (" " * (partitionStr.length - leftOffset)))
+      Seq(thisPrefix + "+-" + partitionStr + " " + rdd) ++ debugChildren(rdd, nextPrefix)
+    }
+    def debugString(rdd: RDD[_],
+                    prefix: String = "",
+                    isShuffle: Boolean = true,
+                    isLastChild: Boolean = false): Seq[String] = {
+      if (isShuffle) {
+        shuffleDebugString(rdd, prefix, isLastChild)
+      }
+      else {
+        Seq(prefix + rdd) ++ debugChildren(rdd, prefix)
+      }
     }
-    debugString(this).mkString("\n")
+    firstDebugString(this).mkString("\n")
   }
 
   override def toString: String = "%s%s[%d] at %s".format(

http://git-wip-us.apache.org/repos/asf/spark/blob/c3462c65/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index e0f433b..4d86e1a 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -62,6 +62,14 @@ object MimaExcludes {
         ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.storage.MemoryStore.Entry"),
         ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$debugChildren$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$firstDebugString$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$shuffleDebugString$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
+          "org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$debugString$1"),
+        ProblemFilters.exclude[MissingMethodProblem](
           "org.apache.spark.rdd.PairRDDFunctions.org$apache$spark$rdd$PairRDDFunctions$$"
             + "createZero$1")
       ) ++