You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/06/19 19:52:33 UTC
spark git commit: [SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number
of SerializationDebugger bugs and limitations
Repository: spark
Updated Branches:
refs/heads/master a9858036b -> 866816eb9
[SPARK-7180] [SPARK-8090] [SPARK-8091] Fix a number of SerializationDebugger bugs and limitations
This PR solves three SerializationDebugger issues.
* SPARK-7180 - SerializationDebugger fails with ArrayOutOfBoundsException
* SPARK-8090 - SerializationDebugger does not handle classes with writeReplace correctly
* SPARK-8091 - SerializationDebugger does not handle classes with writeObject method
The solutions for each are explained as follows
* SPARK-7180 - The wrong slot desc was used for getting the value of the fields in the object being tested.
* SPARK-8090 - Test the type of the replaced object.
* SPARK-8091 - Use a dummy ObjectOutputStream to collect all the objects written by the writeObject() method, and then test those objects as usual.
I also added more tests in the testsuite to increase code coverage. For example, added tests for cases where there are not serializability issues.
Author: Tathagata Das <ta...@gmail.com>
Closes #6625 from tdas/SPARK-7180 and squashes the following commits:
c7cb046 [Tathagata Das] Addressed comments on docs
ae212c8 [Tathagata Das] Improved docs
304c97b [Tathagata Das] Fixed build error
26b5179 [Tathagata Das] more tests.....92% line coverage
7e2fdcf [Tathagata Das] Added more tests
d1967fb [Tathagata Das] Added comments.
da75d34 [Tathagata Das] Removed unnecessary lines.
50a608d [Tathagata Das] Fixed bugs and added support for writeObject
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/866816eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/866816eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/866816eb
Branch: refs/heads/master
Commit: 866816eb97002863ec205d854e1397982aecbc5e
Parents: a985803
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Jun 19 10:52:30 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Jun 19 10:52:30 2015 -0700
----------------------------------------------------------------------
.../serializer/SerializationDebugger.scala | 112 ++++++++++++++++-
.../serializer/SerializationDebuggerSuite.scala | 119 ++++++++++++++++++-
.../spark/streaming/StreamingContext.scala | 4 +-
3 files changed, 223 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/866816eb/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
index bb5db54..cc2f050 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializationDebugger.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{NotSerializableException, ObjectOutput, ObjectStreamClass, ObjectStreamField}
+import java.io._
import java.lang.reflect.{Field, Method}
import java.security.AccessController
@@ -62,7 +62,7 @@ private[spark] object SerializationDebugger extends Logging {
*
* It does not yet handle writeObject override, but that shouldn't be too hard to do either.
*/
- def find(obj: Any): List[String] = {
+ private[serializer] def find(obj: Any): List[String] = {
new SerializationDebugger().visit(obj, List.empty)
}
@@ -125,6 +125,12 @@ private[spark] object SerializationDebugger extends Logging {
return List.empty
}
+ /**
+ * Visit an externalizable object.
+ * Since writeExternal() can choose to add arbitrary objects at the time of serialization,
+ * the only way to capture all the objects it will serialize is by using a
+ * dummy ObjectOutput that collects all the relevant objects for further testing.
+ */
private def visitExternalizable(o: java.io.Externalizable, stack: List[String]): List[String] =
{
val fieldList = new ListObjectOutput
@@ -145,17 +151,50 @@ private[spark] object SerializationDebugger extends Logging {
// An object contains multiple slots in serialization.
// Get the slots and visit fields in all of them.
val (finalObj, desc) = findObjectAndDescriptor(o)
+
+ // If the object has been replaced using writeReplace(),
+ // then call visit() on it again to test its type again.
+ if (!finalObj.eq(o)) {
+ return visit(finalObj, s"writeReplace data (class: ${finalObj.getClass.getName})" :: stack)
+ }
+
+ // Every class is associated with one or more "slots", each slot refers to the parent
+ // classes of this class. These slots are used by the ObjectOutputStream
+ // serialization code to recursively serialize the fields of an object and
+ // its parent classes. For example, if there are the following classes.
+ //
+ // class ParentClass(parentField: Int)
+ // class ChildClass(childField: Int) extends ParentClass(1)
+ //
+ // Then serializing the an object Obj of type ChildClass requires first serializing the fields
+ // of ParentClass (that is, parentField), and then serializing the fields of ChildClass
+ // (that is, childField). Correspondingly, there will be two slots related to this object:
+ //
+ // 1. ParentClass slot, which will be used to serialize parentField of Obj
+ // 2. ChildClass slot, which will be used to serialize childField fields of Obj
+ //
+ // The following code uses the description of each slot to find the fields in the
+ // corresponding object to visit.
+ //
val slotDescs = desc.getSlotDescs
var i = 0
while (i < slotDescs.length) {
val slotDesc = slotDescs(i)
if (slotDesc.hasWriteObjectMethod) {
- // TODO: Handle classes that specify writeObject method.
+ // If the class type corresponding to current slot has writeObject() defined,
+ // then its not obvious which fields of the class will be serialized as the writeObject()
+ // can choose arbitrary fields for serialization. This case is handled separately.
+ val elem = s"writeObject data (class: ${slotDesc.getName})"
+ val childStack = visitSerializableWithWriteObjectMethod(finalObj, elem :: stack)
+ if (childStack.nonEmpty) {
+ return childStack
+ }
} else {
+ // Visit all the fields objects of the class corresponding to the current slot.
val fields: Array[ObjectStreamField] = slotDesc.getFields
val objFieldValues: Array[Object] = new Array[Object](slotDesc.getNumObjFields)
val numPrims = fields.length - objFieldValues.length
- desc.getObjFieldValues(finalObj, objFieldValues)
+ slotDesc.getObjFieldValues(finalObj, objFieldValues)
var j = 0
while (j < objFieldValues.length) {
@@ -169,18 +208,54 @@ private[spark] object SerializationDebugger extends Logging {
}
j += 1
}
-
}
i += 1
}
return List.empty
}
+
+ /**
+ * Visit a serializable object which has the writeObject() defined.
+ * Since writeObject() can choose to add arbitrary objects at the time of serialization,
+ * the only way to capture all the objects it will serialize is by using a
+ * dummy ObjectOutputStream that collects all the relevant fields for further testing.
+ * This is similar to how externalizable objects are visited.
+ */
+ private def visitSerializableWithWriteObjectMethod(
+ o: Object, stack: List[String]): List[String] = {
+ val innerObjectsCatcher = new ListObjectOutputStream
+ var notSerializableFound = false
+ try {
+ innerObjectsCatcher.writeObject(o)
+ } catch {
+ case io: IOException =>
+ notSerializableFound = true
+ }
+
+ // If something was not serializable, then visit the captured objects.
+ // Otherwise, all the captured objects are safely serializable, so no need to visit them.
+ // As an optimization, just added them to the visited list.
+ if (notSerializableFound) {
+ val innerObjects = innerObjectsCatcher.outputArray
+ var k = 0
+ while (k < innerObjects.length) {
+ val childStack = visit(innerObjects(k), stack)
+ if (childStack.nonEmpty) {
+ return childStack
+ }
+ k += 1
+ }
+ } else {
+ visited ++= innerObjectsCatcher.outputArray
+ }
+ return List.empty
+ }
}
/**
* Find the object to serialize and the associated [[ObjectStreamClass]]. This method handles
* writeReplace in Serializable. It starts with the object itself, and keeps calling the
- * writeReplace method until there is no more
+ * writeReplace method until there is no more.
*/
@tailrec
private def findObjectAndDescriptor(o: Object): (Object, ObjectStreamClass) = {
@@ -220,6 +295,31 @@ private[spark] object SerializationDebugger extends Logging {
override def writeByte(i: Int): Unit = {}
}
+ /** An output stream that emulates /dev/null */
+ private class NullOutputStream extends OutputStream {
+ override def write(b: Int) { }
+ }
+
+ /**
+ * A dummy [[ObjectOutputStream]] that saves the list of objects written to it and returns
+ * them through `outputArray`. This works by using the [[ObjectOutputStream]]'s `replaceObject()`
+ * method which gets called on every object, only if replacing is enabled. So this subclass
+ * of [[ObjectOutputStream]] enabled replacing, and uses replaceObject to get the objects that
+ * are being serializabled. The serialized bytes are ignored by sending them to a
+ * [[NullOutputStream]], which acts like a /dev/null.
+ */
+ private class ListObjectOutputStream extends ObjectOutputStream(new NullOutputStream) {
+ private val output = new mutable.ArrayBuffer[Any]
+ this.enableReplaceObject(true)
+
+ def outputArray: Array[Any] = output.toArray
+
+ override def replaceObject(obj: Object): Object = {
+ output += obj
+ obj
+ }
+ }
+
/** An implicit class that allows us to call private methods of ObjectStreamClass. */
implicit class ObjectStreamClassMethods(val desc: ObjectStreamClass) extends AnyVal {
def getSlotDescs: Array[ObjectStreamClass] = {
http://git-wip-us.apache.org/repos/asf/spark/blob/866816eb/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
index 2707bb5..2d5e9d6 100644
--- a/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/serializer/SerializationDebuggerSuite.scala
@@ -17,7 +17,7 @@
package org.apache.spark.serializer
-import java.io.{ObjectOutput, ObjectInput}
+import java.io._
import org.scalatest.BeforeAndAfterEach
@@ -98,7 +98,7 @@ class SerializationDebuggerSuite extends SparkFunSuite with BeforeAndAfterEach {
}
test("externalizable class writing out not serializable object") {
- val s = find(new ExternalizableClass)
+ val s = find(new ExternalizableClass(new SerializableClass2(new NotSerializable)))
assert(s.size === 5)
assert(s(0).contains("NotSerializable"))
assert(s(1).contains("objectField"))
@@ -106,6 +106,93 @@ class SerializationDebuggerSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(s(3).contains("writeExternal"))
assert(s(4).contains("ExternalizableClass"))
}
+
+ test("externalizable class writing out serializable objects") {
+ assert(find(new ExternalizableClass(new SerializableClass1)).isEmpty)
+ }
+
+ test("object containing writeReplace() which returns not serializable object") {
+ val s = find(new SerializableClassWithWriteReplace(new NotSerializable))
+ assert(s.size === 3)
+ assert(s(0).contains("NotSerializable"))
+ assert(s(1).contains("writeReplace"))
+ assert(s(2).contains("SerializableClassWithWriteReplace"))
+ }
+
+ test("object containing writeReplace() which returns serializable object") {
+ assert(find(new SerializableClassWithWriteReplace(new SerializableClass1)).isEmpty)
+ }
+
+ test("object containing writeObject() and not serializable field") {
+ val s = find(new SerializableClassWithWriteObject(new NotSerializable))
+ assert(s.size === 3)
+ assert(s(0).contains("NotSerializable"))
+ assert(s(1).contains("writeObject data"))
+ assert(s(2).contains("SerializableClassWithWriteObject"))
+ }
+
+ test("object containing writeObject() and serializable field") {
+ assert(find(new SerializableClassWithWriteObject(new SerializableClass1)).isEmpty)
+ }
+
+ test("object of serializable subclass with more fields than superclass (SPARK-7180)") {
+ // This should not throw ArrayOutOfBoundsException
+ find(new SerializableSubclass(new SerializableClass1))
+ }
+
+ test("crazy nested objects") {
+
+ def findAndAssert(shouldSerialize: Boolean, obj: Any): Unit = {
+ val s = find(obj)
+ if (shouldSerialize) {
+ assert(s.isEmpty)
+ } else {
+ assert(s.nonEmpty)
+ assert(s.head.contains("NotSerializable"))
+ }
+ }
+
+ findAndAssert(false,
+ new SerializableClassWithWriteReplace(new ExternalizableClass(new SerializableSubclass(
+ new SerializableArray(
+ Array(new SerializableClass1, new SerializableClass2(new NotSerializable))
+ )
+ )))
+ )
+
+ findAndAssert(true,
+ new SerializableClassWithWriteReplace(new ExternalizableClass(new SerializableSubclass(
+ new SerializableArray(
+ Array(new SerializableClass1, new SerializableClass2(new SerializableClass1))
+ )
+ )))
+ )
+ }
+
+ test("improveException") {
+ val e = SerializationDebugger.improveException(
+ new SerializableClass2(new NotSerializable), new NotSerializableException("someClass"))
+ assert(e.getMessage.contains("someClass")) // original exception message should be present
+ assert(e.getMessage.contains("SerializableClass2")) // found debug trace should be present
+ }
+
+ test("improveException with error in debugger") {
+ // Object that throws exception in the SerializationDebugger
+ val o = new SerializableClass1 {
+ private def writeReplace(): Object = {
+ throw new Exception()
+ }
+ }
+ withClue("requirement: SerializationDebugger should fail trying debug this object") {
+ intercept[Exception] {
+ SerializationDebugger.find(o)
+ }
+ }
+
+ val originalException = new NotSerializableException("someClass")
+ // verify thaht original exception is returned on failure
+ assert(SerializationDebugger.improveException(o, originalException).eq(originalException))
+ }
}
@@ -118,10 +205,34 @@ class SerializableClass2(val objectField: Object) extends Serializable
class SerializableArray(val arrayField: Array[Object]) extends Serializable
-class ExternalizableClass extends java.io.Externalizable {
+class SerializableSubclass(val objectField: Object) extends SerializableClass1
+
+
+class SerializableClassWithWriteObject(val objectField: Object) extends Serializable {
+ val serializableObjectField = new SerializableClass1
+
+ @throws(classOf[IOException])
+ private def writeObject(oos: ObjectOutputStream): Unit = {
+ oos.defaultWriteObject()
+ }
+}
+
+
+class SerializableClassWithWriteReplace(@transient replacementFieldObject: Object)
+ extends Serializable {
+ private def writeReplace(): Object = {
+ replacementFieldObject
+ }
+}
+
+
+class ExternalizableClass(objectField: Object) extends java.io.Externalizable {
+ val serializableObjectField = new SerializableClass1
+
override def writeExternal(out: ObjectOutput): Unit = {
out.writeInt(1)
- out.writeObject(new SerializableClass2(new NotSerializable))
+ out.writeObject(serializableObjectField)
+ out.writeObject(objectField)
}
override def readExternal(in: ObjectInput): Unit = {}
http://git-wip-us.apache.org/repos/asf/spark/blob/866816eb/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
index 9cd9684..1708f30 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala
@@ -549,8 +549,8 @@ class StreamingContext private[streaming] (
case e: NotSerializableException =>
throw new NotSerializableException(
"DStream checkpointing has been enabled but the DStreams with their functions " +
- "are not serializable\nSerialization stack:\n" +
- SerializationDebugger.find(checkpoint).map("\t- " + _).mkString("\n")
+ "are not serializable\n" +
+ SerializationDebugger.improveException(checkpoint, e).getMessage()
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org