You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/11/10 22:14:50 UTC

spark git commit: [SPARK-19644][SQL] Clean up Scala reflection garbage after creating Encoder (branch-2.2)

Repository: spark
Updated Branches:
  refs/heads/branch-2.2 6b4ec22e3 -> 8b7f72ed3


[SPARK-19644][SQL] Clean up Scala reflection garbage after creating Encoder (branch-2.2)

## What changes were proposed in this pull request?

Backport #19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu <zs...@gmail.com>

Closes #19718 from zsxwing/SPARK-19644-2.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b7f72ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b7f72ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b7f72ed

Branch: refs/heads/branch-2.2
Commit: 8b7f72ed37dac0daf5158a7f96b38fb1eab1d676
Parents: 6b4ec22
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Fri Nov 10 14:14:47 2017 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Fri Nov 10 14:14:47 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    | 28 ++++++++++++----
 .../encoders/ExpressionEncoderSuite.scala       | 35 ++++++++++++++++++--
 2 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 3b3d566..ad21842 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -62,7 +62,7 @@ object ScalaReflection extends ScalaReflection {
    */
   def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])
 
-  private def dataTypeFor(tpe: `Type`): DataType = ScalaReflectionLock.synchronized {
+  private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
     tpe match {
       case t if t <:< definitions.IntTpe => IntegerType
       case t if t <:< definitions.LongTpe => LongType
@@ -92,7 +92,7 @@ object ScalaReflection extends ScalaReflection {
    * Array[T].  Special handling is performed for primitive types to map them back to their raw
    * JVM form instead of the Scala Array that handles auto boxing.
    */
-  private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized {
+  private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects {
     val cls = tpe match {
       case t if t <:< definitions.IntTpe => classOf[Array[Int]]
       case t if t <:< definitions.LongTpe => classOf[Array[Long]]
@@ -145,7 +145,7 @@ object ScalaReflection extends ScalaReflection {
   private def deserializerFor(
       tpe: `Type`,
       path: Option[Expression],
-      walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized {
+      walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects {
 
     /** Returns the current path with a sub-field extracted. */
     def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = {
@@ -452,7 +452,7 @@ object ScalaReflection extends ScalaReflection {
       inputObject: Expression,
       tpe: `Type`,
       walkedTypePath: Seq[String],
-      seenTypeSet: Set[`Type`] = Set.empty): Expression = ScalaReflectionLock.synchronized {
+      seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects {
 
     def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
       dataTypeFor(elementType) match {
@@ -638,7 +638,7 @@ object ScalaReflection extends ScalaReflection {
    * Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
    * we also treat [[DefinedByConstructorParams]] as product type.
    */
-  def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
+  def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects {
     tpe match {
       case t if t <:< localTypeOf[Option[_]] =>
         val TypeRef(_, _, Seq(optType)) = t
@@ -700,7 +700,7 @@ object ScalaReflection extends ScalaReflection {
   def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])
 
   /** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
-  def schemaFor(tpe: `Type`): Schema = ScalaReflectionLock.synchronized {
+  def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects {
     tpe match {
       case t if t.typeSymbol.annotations.exists(_.tpe =:= typeOf[SQLUserDefinedType]) =>
         val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
@@ -766,7 +766,7 @@ object ScalaReflection extends ScalaReflection {
   /**
    * Whether the fields of the given type is defined entirely by its constructor parameters.
    */
-  def definedByConstructorParams(tpe: Type): Boolean = {
+  def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects {
     tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
   }
 
@@ -796,6 +796,20 @@ trait ScalaReflection {
   import scala.collection.Map
 
   /**
+   * Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
+   * clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
+   * `scala.reflect.runtime.JavaUniverse.undoLog`.
+   *
+   * This method will also wrap `func` with `ScalaReflectionLock.synchronized` so the caller doesn't
+   * need to call it again.
+   *
+   * @see https://github.com/scala/bug/issues/8302
+   */
+  def cleanUpReflectionObjects[T](func: => T): T = ScalaReflectionLock.synchronized {
+    universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
+  }
+
+  /**
    * Return the Scala Type for `T` in the current classloader mirror.
    *
    * Use this method instead of the convenience method `universe.typeOf`, which

http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index bb1955a..e6d09bd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.ClosureCleaner
 
 case class RepeatedStruct(s: Seq[PrimitiveData])
 
@@ -114,7 +115,9 @@ object ReferenceValueClass {
 class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
   OuterScopes.addOuterScope(this)
 
-  implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
+  implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects {
+    ExpressionEncoder()
+  }
 
   // test flat encoders
   encodeDecodeTest(false, "primitive boolean")
@@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
   private def encodeDecodeTest[T : ExpressionEncoder](
       input: T,
       testName: String): Unit = {
-    test(s"encode/decode for $testName: $input") {
+    testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") {
       val encoder = implicitly[ExpressionEncoder[T]]
+
+      // Make sure encoder is serializable.
+      ClosureCleaner.clean((s: String) => encoder.getClass.getName)
+
       val row = encoder.toRow(input)
       val schema = encoder.schema.toAttributes
       val boundEncoder = encoder.resolveAndBind()
@@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
       }
     }
   }
+
+  /**
+   * Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
+   * ensure we don't leak Scala reflection garbage.
+   *
+   * @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
+   */
+  private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
+    def undoLogSize: Int = {
+      scala.reflect.runtime.universe
+        .asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size
+    }
+
+    val previousUndoLogSize = undoLogSize
+    val r = func
+    assert(previousUndoLogSize == undoLogSize)
+    r
+  }
+
+  private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) {
+    test(testName) {
+      verifyNotLeakingReflectionObjects(testFun)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org