You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/05/13 22:45:28 UTC

git commit: SPARK-571: forbid return statements in cleaned closures

Repository: spark
Updated Branches:
  refs/heads/master 52d905296 -> 16ffadcc4


SPARK-571: forbid return statements in cleaned closures

This patch checks top-level closure arguments to `ClosureCleaner.clean` for `return` statements and raises an exception if it finds any.  This is mainly a user-friendliness addition, since programs with return statements in closure arguments will currently fail upon RDD actions with a less-than-intuitive error message.

Author: William Benton <wi...@redhat.com>

Closes #717 from willb/spark-571 and squashes the following commits:

c41eb7d [William Benton] Another test case for SPARK-571
30c42f4 [William Benton] Stylistic cleanups
559b16b [William Benton] Stylistic cleanups from review
de13b79 [William Benton] Style fixes
295b6a5 [William Benton] Forbid return statements in closure arguments.
b017c47 [William Benton] Added a test for SPARK-571


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16ffadcc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16ffadcc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16ffadcc

Branch: refs/heads/master
Commit: 16ffadcc4af21430b5079dc555bcd9d8cf1fa1fa
Parents: 52d9052
Author: William Benton <wi...@redhat.com>
Authored: Tue May 13 13:45:23 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue May 13 13:45:23 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/ClosureCleaner.scala  | 23 +++++++++++-
 .../apache/spark/util/ClosureCleanerSuite.scala | 39 +++++++++++++++++++-
 2 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index 2d05e09..4916d9b 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -25,7 +25,7 @@ import scala.collection.mutable.Set
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.{ClassReader, ClassVisitor, MethodVisitor, Type}
 import com.esotericsoftware.reflectasm.shaded.org.objectweb.asm.Opcodes._
 
-import org.apache.spark.Logging
+import org.apache.spark.{Logging, SparkException}
 
 private[spark] object ClosureCleaner extends Logging {
   // Get an ASM class reader for a given class from the JAR that loaded it
@@ -108,6 +108,9 @@ private[spark] object ClosureCleaner extends Logging {
     val outerObjects = getOuterObjects(func)
 
     val accessedFields = Map[Class[_], Set[String]]()
+    
+    getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)
+    
     for (cls <- outerClasses)
       accessedFields(cls) = Set[String]()
     for (cls <- func.getClass :: innerClasses)
@@ -181,6 +184,24 @@ private[spark] object ClosureCleaner extends Logging {
 }
 
 private[spark]
+class ReturnStatementFinder extends ClassVisitor(ASM4) {
+  override def visitMethod(access: Int, name: String, desc: String,
+      sig: String, exceptions: Array[String]): MethodVisitor = {
+    if (name.contains("apply")) {
+      new MethodVisitor(ASM4) {
+        override def visitTypeInsn(op: Int, tp: String) {
+          if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
+            throw new SparkException("Return statements aren't allowed in Spark closures")
+          }
+        }
+      }
+    } else {
+      new MethodVisitor(ASM4) {}
+    }
+  }
+}
+
+private[spark]
 class FieldAccessFinder(output: Map[Class[_], Set[String]]) extends ClassVisitor(ASM4) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {

http://git-wip-us.apache.org/repos/asf/spark/blob/16ffadcc/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
index d7e48e6..054ef54 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.util
 import org.scalatest.FunSuite
 
 import org.apache.spark.LocalSparkContext._
-import org.apache.spark.SparkContext
+import org.apache.spark.{SparkContext, SparkException}
 
 class ClosureCleanerSuite extends FunSuite {
   test("closures inside an object") {
@@ -50,6 +50,19 @@ class ClosureCleanerSuite extends FunSuite {
     val obj = new TestClassWithNesting(1)
     assert(obj.run() === 96) // 4 * (1+2+3+4) + 4 * (1+2+3+4) + 16 * 1
   }
+  
+  test("toplevel return statements in closures are identified at cleaning time") {
+    val ex = intercept[SparkException] {
+      TestObjectWithBogusReturns.run()
+    }
+    
+    assert(ex.getMessage.contains("Return statements aren't allowed in Spark closures"))
+  }
+
+  test("return statements from named functions nested in closures don't raise exceptions") {
+    val result = TestObjectWithNestedReturns.run()
+    assert(result == 1)
+  }
 }
 
 // A non-serializable class we create in closures to make sure that we aren't
@@ -108,6 +121,30 @@ class TestClassWithoutFieldAccess {
   }
 }
 
+object TestObjectWithBogusReturns {
+  def run(): Int = {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      // this return is invalid since it will transfer control outside the closure
+      nums.map {x => return 1 ; x * 2}
+      1
+    }
+  }
+}
+
+object TestObjectWithNestedReturns {
+  def run(): Int = {
+    withSpark(new SparkContext("local", "test")) { sc =>
+      val nums = sc.parallelize(Array(1, 2, 3, 4))
+      nums.map {x => 
+        // this return is fine since it will not transfer control outside the closure
+        def foo(): Int = { return 5; 1 }
+        foo()
+      }
+      1
+    }
+  }
+}
 
 object TestObjectWithNesting {
   def run(): Int = {