You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2016/04/14 19:58:20 UTC

spark git commit: [SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object

Repository: spark
Updated Branches:
  refs/heads/master a46f98d3f -> 1d04c86fc


[SPARK-14558][CORE] In ClosureCleaner, clean the outer pointer if it's a REPL line object

## What changes were proposed in this pull request?

When we clean a closure, if its outermost parent is not a closure, we won't clone and clean it as cloning user's objects is dangerous. However, if it's a REPL line object, which may carry a lot of unnecessary references(like hadoop conf, spark conf, etc.), we should clean it as it's not a user object.

This PR improves the check for user's objects to exclude REPL line object.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #12327 from cloud-fan/closure.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d04c86f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d04c86f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d04c86f

Branch: refs/heads/master
Commit: 1d04c86fc575470e15f6667076377cea102552d7
Parents: a46f98d
Author: Wenchen Fan <we...@databricks.com>
Authored: Thu Apr 14 10:58:06 2016 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Apr 14 10:58:06 2016 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ClosureCleaner.scala  | 53 +++++++++-----------
 .../scala/org/apache/spark/repl/ReplSuite.scala | 27 ++++++++++
 2 files changed, 50 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1d04c86f/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 2f6924f..489688c 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -19,7 +19,8 @@ package org.apache.spark.util
 
 import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
 
-import scala.collection.mutable.{Map, Set}
+import scala.collection.mutable.{Map, Set, Stack}
+import scala.language.existentials
 
 import org.apache.xbean.asm5.{ClassReader, ClassVisitor, MethodVisitor, Type}
 import org.apache.xbean.asm5.Opcodes._
@@ -77,35 +78,19 @@ private[spark] object ClosureCleaner extends Logging {
    */
   private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
     val seen = Set[Class[_]](obj.getClass)
-    var stack = List[Class[_]](obj.getClass)
+    val stack = Stack[Class[_]](obj.getClass)
     while (!stack.isEmpty) {
-      val cr = getClassReader(stack.head)
-      stack = stack.tail
+      val cr = getClassReader(stack.pop())
       val set = Set[Class[_]]()
       cr.accept(new InnerClosureFinder(set), 0)
       for (cls <- set -- seen) {
         seen += cls
-        stack = cls :: stack
+        stack.push(cls)
       }
     }
     (seen - obj.getClass).toList
   }
 
-  private def createNullValue(cls: Class[_]): AnyRef = {
-    if (cls.isPrimitive) {
-      cls match {
-        case java.lang.Boolean.TYPE => new java.lang.Boolean(false)
-        case java.lang.Character.TYPE => new java.lang.Character('\u0000')
-        case java.lang.Void.TYPE =>
-          // This should not happen because `Foo(void x) {}` does not compile.
-          throw new IllegalStateException("Unexpected void parameter in constructor")
-        case _ => new java.lang.Byte(0: Byte)
-      }
-    } else {
-      null
-    }
-  }
-
   /**
    * Clean the given closure in place.
    *
@@ -233,16 +218,24 @@ private[spark] object ClosureCleaner extends Logging {
     // Note that all outer objects but the outermost one (first one in this list) must be closures
     var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
     var parent: AnyRef = null
-    if (outerPairs.size > 0 && !isClosure(outerPairs.head._1)) {
-      // The closure is ultimately nested inside a class; keep the object of that
-      // class without cloning it since we don't want to clone the user's objects.
-      // Note that we still need to keep around the outermost object itself because
-      // we need it to clone its child closure later (see below).
-      logDebug(s" + outermost object is not a closure, so do not clone it: ${outerPairs.head}")
-      parent = outerPairs.head._2 // e.g. SparkContext
-      outerPairs = outerPairs.tail
-    } else if (outerPairs.size > 0) {
-      logDebug(s" + outermost object is a closure, so we just keep it: ${outerPairs.head}")
+    if (outerPairs.size > 0) {
+      val (outermostClass, outermostObject) = outerPairs.head
+      if (isClosure(outermostClass)) {
+        logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
+      } else if (outermostClass.getName.startsWith("$line")) {
+        // SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
+        // as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.
+        logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
+      } else {
+        // The closure is ultimately nested inside a class; keep the object of that
+        // class without cloning it since we don't want to clone the user's objects.
+        // Note that we still need to keep around the outermost object itself because
+        // we need it to clone its child closure later (see below).
+        logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
+          outerPairs.head)
+        parent = outermostObject // e.g. SparkContext
+        outerPairs = outerPairs.tail
+      }
     } else {
       logDebug(" + there are no enclosing objects!")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/1d04c86f/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 7e10f15..d3dafe9 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -373,4 +373,31 @@ class ReplSuite extends SparkFunSuite {
     assertDoesNotContain("error:", output)
     assertDoesNotContain("Exception", output)
   }
+
+  test("should clone and clean line object in ClosureCleaner") {
+    val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+      """
+        |import org.apache.spark.rdd.RDD
+        |
+        |val lines = sc.textFile("pom.xml")
+        |case class Data(s: String)
+        |val dataRDD = lines.map(line => Data(line.take(3)))
+        |dataRDD.cache.count
+        |val repartitioned = dataRDD.repartition(dataRDD.partitions.size)
+        |repartitioned.cache.count
+        |
+        |def getCacheSize(rdd: RDD[_]) = {
+        |  sc.getRDDStorageInfo.filter(_.id == rdd.id).map(_.memSize).sum
+        |}
+        |val cacheSize1 = getCacheSize(dataRDD)
+        |val cacheSize2 = getCacheSize(repartitioned)
+        |
+        |// The cache size of dataRDD and the repartitioned one should be similar.
+        |val deviation = math.abs(cacheSize2 - cacheSize1).toDouble / cacheSize1
+        |assert(deviation < 0.2,
+        |  s"deviation too large: $deviation, first size: $cacheSize1, second size: $cacheSize2")
+      """.stripMargin)
+    assertDoesNotContain("AssertionError", output)
+    assertDoesNotContain("Exception", output)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org