You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/09/21 17:20:24 UTC

spark git commit: [SPARK-21928][CORE] Set classloader on SerializerManager's private kryo

Repository: spark
Updated Branches:
  refs/heads/master f10cbf17d -> b75bd1777


[SPARK-21928][CORE] Set classloader on SerializerManager's private kryo

## What changes were proposed in this pull request?

We have to make sure that SerializerManager's private instance of
kryo also uses the right classloader, regardless of the current thread
classloader.  In particular, this fixes serde during remote cache
fetches, as those occur in netty threads.

## How was this patch tested?

Manual tests & existing suite via jenkins.  I haven't been able to reproduce this is in a unit test, because when a remote RDD partition can be fetched, there is a warning message and then the partition is just recomputed locally.  I manually verified the warning message is no longer present.

Author: Imran Rashid <ir...@cloudera.com>

Closes #19280 from squito/SPARK-21928_ser_classloader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b75bd177
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b75bd177
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b75bd177

Branch: refs/heads/master
Commit: b75bd1777496ce0354458bf85603a8087a6a0ff8
Parents: f10cbf1
Author: Imran Rashid <ir...@cloudera.com>
Authored: Thu Sep 21 10:20:19 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Thu Sep 21 10:20:19 2017 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/executor/Executor.scala     | 3 +++
 .../scala/org/apache/spark/serializer/SerializerManager.scala    | 4 ++++
 .../src/test/scala/org/apache/spark/executor/ExecutorSuite.scala | 3 ++-
 3 files changed, 9 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b75bd177/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 3f55d01..2ecbb74 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -131,6 +131,9 @@ private[spark] class Executor(
 
   // Set the classloader for serializer
   env.serializer.setDefaultClassLoader(replClassLoader)
+  // SPARK-21928.  SerializerManager's internal instance of Kryo might get used in netty threads
+  // for fetching remote cached RDD blocks, so need to make sure it uses the right classloader too.
+  env.serializerManager.setDefaultClassLoader(replClassLoader)
 
   // Max size of direct result. If task result is bigger than this, we use the block manager
   // to send the result back.

http://git-wip-us.apache.org/repos/asf/spark/blob/b75bd177/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index bb7ed87..311383e 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -41,6 +41,10 @@ private[spark] class SerializerManager(
 
   private[this] val kryoSerializer = new KryoSerializer(conf)
 
+  def setDefaultClassLoader(classLoader: ClassLoader): Unit = {
+    kryoSerializer.setDefaultClassLoader(classLoader)
+  }
+
   private[this] val stringClassTag: ClassTag[String] = implicitly[ClassTag[String]]
   private[this] val primitiveAndPrimitiveArrayClassTags: Set[ClassTag[_]] = {
     val primitiveClassTags = Set[ClassTag[_]](

http://git-wip-us.apache.org/repos/asf/spark/blob/b75bd177/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
index 884a275..105a178 100644
--- a/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/executor/ExecutorSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rpc.RpcEnv
 import org.apache.spark.scheduler.{FakeTask, ResultTask, TaskDescription}
-import org.apache.spark.serializer.JavaSerializer
+import org.apache.spark.serializer.{JavaSerializer, SerializerManager}
 import org.apache.spark.shuffle.FetchFailedException
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.UninterruptibleThread
@@ -234,6 +234,7 @@ class ExecutorSuite extends SparkFunSuite with LocalSparkContext with MockitoSug
     val mockMemoryManager = mock[MemoryManager]
     when(mockEnv.conf).thenReturn(conf)
     when(mockEnv.serializer).thenReturn(serializer)
+    when(mockEnv.serializerManager).thenReturn(mock[SerializerManager])
     when(mockEnv.rpcEnv).thenReturn(mockRpcEnv)
     when(mockEnv.metricsSystem).thenReturn(mockMetricsSystem)
     when(mockEnv.memoryManager).thenReturn(mockMemoryManager)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org