You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/11/08 10:24:44 UTC
spark git commit: [SPARK-14540][BUILD] Support Scala 2.12 closures
and Java 8 lambdas in ClosureCleaner (step 0)
Repository: spark
Updated Branches:
refs/heads/master 11eea1a4c -> 51debf8b1
[SPARK-14540][BUILD] Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner (step 0)
## What changes were proposed in this pull request?
Preliminary changes to get ClosureCleaner to work with Scala 2.12. Makes many usages just work, but not all. This does _not_ resolve the JIRA.
## How was this patch tested?
Existing tests
Author: Sean Owen <so...@cloudera.com>
Closes #19675 from srowen/SPARK-14540.0.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51debf8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51debf8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51debf8b
Branch: refs/heads/master
Commit: 51debf8b1f4d479bc7f81e2759ba28e526367d70
Parents: 11eea1a
Author: Sean Owen <so...@cloudera.com>
Authored: Wed Nov 8 10:24:40 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Nov 8 10:24:40 2017 +0000
----------------------------------------------------------------------
.../org/apache/spark/util/ClosureCleaner.scala | 28 +++++++++++---------
.../spark/util/ClosureCleanerSuite2.scala | 10 ++++---
.../apache/spark/graphx/lib/ShortestPaths.scala | 2 +-
.../spark/streaming/BasicOperationsSuite.scala | 4 +--
4 files changed, 26 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index dfece5d..4061642 100644
--- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -38,12 +38,13 @@ private[spark] object ClosureCleaner extends Logging {
// Copy data over, before delegating to ClassReader - else we can run out of open file handles.
val className = cls.getName.replaceFirst("^.*\\.", "") + ".class"
val resourceStream = cls.getResourceAsStream(className)
- // todo: Fixme - continuing with earlier behavior ...
- if (resourceStream == null) return new ClassReader(resourceStream)
-
- val baos = new ByteArrayOutputStream(128)
- Utils.copyStream(resourceStream, baos, true)
- new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+ if (resourceStream == null) {
+ null
+ } else {
+ val baos = new ByteArrayOutputStream(128)
+ Utils.copyStream(resourceStream, baos, true)
+ new ClassReader(new ByteArrayInputStream(baos.toByteArray))
+ }
}
// Check whether a class represents a Scala closure
@@ -81,11 +82,13 @@ private[spark] object ClosureCleaner extends Logging {
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val cr = getClassReader(stack.pop())
- val set = Set.empty[Class[_]]
- cr.accept(new InnerClosureFinder(set), 0)
- for (cls <- set -- seen) {
- seen += cls
- stack.push(cls)
+ if (cr != null) {
+ val set = Set.empty[Class[_]]
+ cr.accept(new InnerClosureFinder(set), 0)
+ for (cls <- set -- seen) {
+ seen += cls
+ stack.push(cls)
+ }
}
}
(seen - obj.getClass).toList
@@ -366,7 +369,8 @@ private[spark] class ReturnStatementInClosureException
private class ReturnStatementFinder extends ClassVisitor(ASM5) {
override def visitMethod(access: Int, name: String, desc: String,
sig: String, exceptions: Array[String]): MethodVisitor = {
- if (name.contains("apply")) {
+ // $anonfun$ covers Java 8 lambdas
+ if (name.contains("apply") || name.contains("$anonfun$")) {
new MethodVisitor(ASM5) {
override def visitTypeInsn(op: Int, tp: String) {
if (op == NEW && tp.contains("scala/runtime/NonLocalReturnControl")) {
http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
index 934385f..278fada 100644
--- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
+++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala
@@ -117,9 +117,13 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri
findTransitively: Boolean): Map[Class[_], Set[String]] = {
val fields = new mutable.HashMap[Class[_], mutable.Set[String]]
outerClasses.foreach { c => fields(c) = new mutable.HashSet[String] }
- ClosureCleaner.getClassReader(closure.getClass)
- .accept(new FieldAccessFinder(fields, findTransitively), 0)
- fields.mapValues(_.toSet).toMap
+ val cr = ClosureCleaner.getClassReader(closure.getClass)
+ if (cr == null) {
+ Map.empty
+ } else {
+ cr.accept(new FieldAccessFinder(fields, findTransitively), 0)
+ fields.mapValues(_.toSet).toMap
+ }
}
// Accessors for private methods
http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
----------------------------------------------------------------------
diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index 4cac633..aff0b93 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -25,7 +25,7 @@ import org.apache.spark.graphx._
* Computes shortest paths to the given set of landmark vertices, returning a graph where each
* vertex attribute is a map containing the shortest-path distance to each reachable landmark.
*/
-object ShortestPaths {
+object ShortestPaths extends Serializable {
/** Stores a map from the vertex id of a landmark to the distance to that landmark. */
type SPMap = Map[VertexId, Int]
http://git-wip-us.apache.org/repos/asf/spark/blob/51debf8b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
index 6f62c7a..0a764f6 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/BasicOperationsSuite.scala
@@ -596,8 +596,6 @@ class BasicOperationsSuite extends TestSuiteBase {
)
val updateStateOperation = (s: DStream[String]) => {
- class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
-
// updateFunc clears a state when a StateObject is seen without new values twice in a row
val updateFunc = (values: Seq[Int], state: Option[StateObject]) => {
val stateObj = state.getOrElse(new StateObject)
@@ -817,3 +815,5 @@ class BasicOperationsSuite extends TestSuiteBase {
}
}
}
+
+class StateObject(var counter: Int = 0, var expireCounter: Int = 0) extends Serializable
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org