You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2015/10/27 09:31:41 UTC

spark git commit: [SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.

Repository: spark
Updated Branches:
  refs/heads/master 8f888eea1 -> 17f499920


[SPARK-5569][STREAMING] fix ObjectInputStreamWithLoader for supporting load array classes.

When use Kafka DirectStream API to create checkpoint and restore saved checkpoint when restart,
ClassNotFound exception would occur.

The reason for this error is that ObjectInputStreamWithLoader extends the ObjectInputStream class and override its resolveClass method. But Instead of Using Class.forName(desc,false,loader), Spark uses loader.loadClass(desc) to instance the class, which do not works with array class.

For example:
Class.forName("[Lorg.apache.spark.streaming.kafka.OffsetRange.",false,loader) works well while loader.loadClass("[Lorg.apache.spark.streaming.kafka.OffsetRange") would throw an class not found exception.

details of the difference between Class.forName and loader.loadClass can be found here.
http://bugs.java.com/view_bug.do?bug_id=6446627

Author: maxwell <ma...@gmail.com>
Author: DEMING ZHU <de...@linecorp.com>

Closes #8955 from maxwellzdm/master.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17f49992
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17f49992
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17f49992

Branch: refs/heads/master
Commit: 17f499920776e0e995434cfa300ff2ff38658fa8
Parents: 8f888ee
Author: maxwell <ma...@gmail.com>
Authored: Tue Oct 27 01:31:28 2015 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Tue Oct 27 01:31:28 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/streaming/Checkpoint.scala |  4 ++-
 .../spark/streaming/CheckpointSuite.scala       | 35 ++++++++++++++++++--
 2 files changed, 36 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/17f49992/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index 8a6050f..b7de6dd 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -352,7 +352,9 @@ class ObjectInputStreamWithLoader(inputStream_ : InputStream, loader: ClassLoade
 
   override def resolveClass(desc: ObjectStreamClass): Class[_] = {
     try {
-      return loader.loadClass(desc.getName())
+      // scalastyle:off classforname
+      return Class.forName(desc.getName(), false, loader)
+      // scalastyle:on classforname
     } catch {
       case e: Exception =>
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/17f49992/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
----------------------------------------------------------------------
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
index a695653..84f5294 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/CheckpointSuite.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.streaming
 
-import java.io.File
+import java.io.{ObjectOutputStream, ByteArrayOutputStream, ByteArrayInputStream, File}
+import org.apache.spark.TestUtils
 
 import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
 import scala.reflect.ClassTag
@@ -34,7 +35,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.streaming.dstream.{DStream, FileInputDStream}
 import org.apache.spark.streaming.scheduler.{ConstantEstimator, RateTestInputDStream, RateTestReceiver}
-import org.apache.spark.util.{Clock, ManualClock, Utils}
+import org.apache.spark.util.{MutableURLClassLoader, Clock, ManualClock, Utils}
 
 /**
  * This test suites tests the checkpointing functionality of DStreams -
@@ -579,6 +580,36 @@ class CheckpointSuite extends TestSuiteBase {
     }
   }
 
+  // This tests whether spark can deserialize array object
+  // refer to SPARK-5569
+  test("recovery from checkpoint contains array object") {
+    // create a class which is invisible to app class loader
+    val jar = TestUtils.createJarWithClasses(
+      classNames = Seq("testClz"),
+      toStringValue = "testStringValue"
+      )
+
+    // invisible to current class loader
+    val appClassLoader = getClass.getClassLoader
+    intercept[ClassNotFoundException](appClassLoader.loadClass("testClz"))
+
+    // visible to mutableURLClassLoader
+    val loader = new MutableURLClassLoader(
+      Array(jar), appClassLoader)
+    assert(loader.loadClass("testClz").newInstance().toString == "testStringValue")
+
+    // create and serialize Array[testClz]
+    // scalastyle:off classforname
+    val arrayObj = Class.forName("[LtestClz;", false, loader)
+    // scalastyle:on classforname
+    val bos = new ByteArrayOutputStream()
+    new ObjectOutputStream(bos).writeObject(arrayObj)
+
+    // deserialize the Array[testClz]
+    val ois = new ObjectInputStreamWithLoader(
+      new ByteArrayInputStream(bos.toByteArray), loader)
+    assert(ois.readObject().asInstanceOf[Class[_]].getName == "[LtestClz;")
+  }
 
   /**
    * Tests a streaming operation under checkpointing, by restarting the operation


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org