You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/28 06:17:26 UTC

git commit: [SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...

Repository: spark
Updated Branches:
  refs/heads/master 6f986f0b8 -> 3d89043b7


[SPARK-1210] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executo...

...r.

Constructor of `org.apache.spark.executor.Executor` should not set context class loader of current thread, which is backend Actor's thread.

Run the following code in local-mode REPL.

```
scala> case class Foo(i: Int)
scala> val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
```

This causes errors as follows:

```
ERROR actor.OneForOneStrategy: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
java.lang.ArrayStoreException: [L$line5.$read$$iwC$$iwC$$iwC$$iwC$Foo;
     at scala.runtime.ScalaRunTime$.array_update(ScalaRunTime.scala:88)
     at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
     at org.apache.spark.SparkContext$$anonfun$runJob$3.apply(SparkContext.scala:870)
     at org.apache.spark.scheduler.JobWaiter.taskSucceeded(JobWaiter.scala:56)
     at org.apache.spark.scheduler.DAGScheduler.handleTaskCompletion(DAGScheduler.scala:859)
     at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:616)
     at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
     at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

This is because the class loaders to deserialize result `Foo` instances might be different from backend Actor's, and the Actor's class loader should be the same as Driver's.

Author: Takuya UESHIN <ue...@happy-camper.st>

Closes #15 from ueshin/wip/wrongcontextclassloader and squashes the following commits:

d79e8c0 [Takuya UESHIN] Change a parent class loader of ExecutorURLClassLoader.
c6c09b6 [Takuya UESHIN] Add a test to collect objects of class defined in repl.
43e0feb [Takuya UESHIN] Prevent ContextClassLoader of Actor from becoming ClassLoader of Executor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d89043b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d89043b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d89043b

Branch: refs/heads/master
Commit: 3d89043b7ed13bc1bb703f6eb7c00e46b936de1e
Parents: 6f986f0
Author: Takuya UESHIN <ue...@happy-camper.st>
Authored: Thu Mar 27 22:17:15 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Mar 27 22:17:15 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/executor/Executor.scala  |  5 ++---
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++++++++++
 2 files changed, 13 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d89043b/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 8fe9b84..13e2e29 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -112,11 +112,10 @@ private[spark] class Executor(
     }
   }
 
-  // Create our ClassLoader and set it on this thread
+  // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager
   private val urlClassLoader = createClassLoader()
   private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
-  Thread.currentThread.setContextClassLoader(replClassLoader)
 
   // Akka's message frame size. If task result is bigger than this, we use the block manager
   // to send the result back.
@@ -294,7 +293,7 @@ private[spark] class Executor(
    * created by the interpreter to the search path
    */
   private def createClassLoader(): ExecutorURLClassLoader = {
-    val loader = this.getClass.getClassLoader
+    val loader = Thread.currentThread().getContextClassLoader
 
     // For each of the jars in the jarSet, add them to the class loader.
     // We assume each of the files has already been fetched.

http://git-wip-us.apache.org/repos/asf/spark/blob/3d89043b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
----------------------------------------------------------------------
diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 8203b8f..4155007 100644
--- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -242,4 +242,15 @@ class ReplSuite extends FunSuite {
       assertContains("res4: Array[Int] = Array(0, 0, 0, 0, 0)", output)
     }
   }
+
+  test("collecting objects of class defined in repl") {
+    val output = runInterpreter("local[2]",
+      """
+        |case class Foo(i: Int)
+        |val ret = sc.parallelize((1 to 100).map(Foo), 10).collect
+      """.stripMargin)
+    assertDoesNotContain("error:", output)
+    assertDoesNotContain("Exception", output)
+    assertContains("ret: Array[Foo] = Array(Foo(1),", output)
+  }
 }