You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/08 10:24:44 UTC

spark git commit: [SPARK-14540][BUILD] Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner (step 0)

Repository: spark
Updated Branches:
  refs/heads/master 11eea1a4c -> 51debf8b1


[SPARK-14540][BUILD] Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner (step 0)

## What changes were proposed in this pull request?

Preliminary changes to get ClosureCleaner to work with Scala 2.12. Makes many usages just work, but not all. This does _not_ resolve the JIRA.

## How was this patch tested?

Existing tests

Author: Sean Owen <so...@cloudera.com>

Closes #19675 from srowen/SPARK-14540.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51debf8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51debf8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51debf8b

Branch: refs/heads/master
Commit: 51debf8b1f4d479bc7f81e2759ba28e526367d70
Parents: 11eea1a
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Nov 8 10:24:40 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Nov 8 10:24:40 2017 +0000

----------------------------------------------------------------------
 .../org/apache/spark/util/ClosureCleaner.scala  | 28 +++++++++++---------
 .../spark/util/ClosureCleanerSuite2.scala       | 10 ++++---
 .../apache/spark/graphx/lib/ShortestPaths.scala |  2 +-
 .../spark/streaming/BasicOperationsSuite.scala  |  4 +--
 4 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index dfece5d..4061642 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -38,12 +38,13 @@ private[spark] object ClosureCleaner extends Logging {
     // Copy data over, before delegating to ClassReader - else we can run out of open file handles.
     val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
     val resourceStream = cls.getResourceAsStream(className)
-    // todo: Fixme - continuing with earlier behavior ...
-    if (resourceStream == null) return new ClassReader(resourceStream)
-
-    val baos = new ByteArrayOutputStream(128)
-    Utils.copyStream(resourceStream, baos, true)
-    new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+    if (resourceStream == null) {
+      null
+    } else {
+      val baos = new ByteArrayOutputStream(128)
+      Utils.copyStream(resourceStream, baos, true)
+      new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+    }
   }
 
   // Check whether a class represents a Scala closure
@@ -81,11 +82,13 @@ private[spark] object ClosureCleaner extends Logging {
     val stack = Stack[Class[_]](obj.getClass)
     while (!stack.isEmpty) {
       val cr = getClassReader(stack.pop())
-      val set = Set.empty[Class[_]]
-      cr.accept(new InnerClosureFinder(set), 0)
-      for (cls <- set -- seen) {
-        seen += cls
-        stack.push(cls)
+      if (cr != null) {
+        val set = Set.empty[Class[_]]
+        cr.accept(new InnerClosureFinder(set), 0)
+        for (cls <- set -- seen) {
+          seen += cls
+          stack.push(cls)
+        }
       }
     }
     (seen - obj.getClass).toList
@@ -366,7 +369,8 @@ private[spark] class ReturnStatementInClosureException
 private class ReturnStatementFinder extends ClassVisitor(ASM5) {
   override def visitMethod(access: Int, name: String, desc: String,
       sig: String, exceptions: Array[String]): MethodVisitor = {
-    if (name.contains("apply")) {
+    // $anonfun$ covers Java 8 lambdas
+    if (name.contains("apply") || name.contains("$anonfun$")) {
       new MethodVisitor(ASM5) {
         override def visitTypeInsn(op: Int, tp: String) {
           if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {

http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 934385f..278fada 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -117,9 +117,13 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
       findTransitively: Boolean): Map[Class[_], Set[String]] = {
     val fields = new mutable.HashMap[Class[_], mutable.Set[String]]
     outerClasses.foreach { c => fields(c) = new mutable.HashSet[String] }
-    ClosureCleaner.getClassReader(closure.getClass)
-      .accept(new FieldAccessFinder(fields, findTransitively), 0)
-    fields.mapValues(_.toSet).toMap
+    val cr = ClosureCleaner.getClassReader(closure.getClass)
+    if (cr == null) {
+      Map.empty
+    } else {
+      cr.accept(new FieldAccessFinder(fields, findTransitively), 0)
+      fields.mapValues(_.toSet).toMap
+    }
   }
 
   // Accessors for private methods

http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index 4cac633..aff0b93 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -25,7 +25,7 @@ import org.apache.spark.graphx._
  * Computes shortest paths to the given set of landmark vertices, returning a graph where each
  * vertex attribute is a map containing the shortest-path distance to each reachable landmark.
  */
-object ShortestPaths {
+object ShortestPaths extends Serializable {
   /** Stores a map from the vertex id of a landmark to the distance to that landmark. */
   type SPMap = Map[VertexId, Int]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 6f62c7a..0a764f6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -596,8 +596,6 @@ class BasicOperationsSuite extends TestSuiteBase {
       )
 
     val updateStateOperation = (s: DStream[String]) => {
-      class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
-
       // updateFunc clears a state when a StateObject is seen without new values twice in a row
       val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
         val stateObj = state.getOrElse(new StateObject)
@@ -817,3 +815,5 @@ class BasicOperationsSuite extends TestSuiteBase {
     }
   }
 }
+
+class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org