You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/11/10 22:14:50 UTC
spark git commit: [SPARK-19644][SQL] Clean up Scala reflection
garbage after creating Encoder (branch-2.2)
Repository: spark
Updated Branches:
refs/heads/branch-2.2 6b4ec22e3 -> 8b7f72ed3
[SPARK-19644][SQL] Clean up Scala reflection garbage after creating Encoder (branch-2.2)
## What changes were proposed in this pull request?
Backport #19687 to branch-2.2. The major difference is `cleanUpReflectionObjects` is protected by `ScalaReflectionLock.synchronized` in this PR for Scala 2.10.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <zs...@gmail.com>
Closes #19718 from zsxwing/SPARK-19644-2.2.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8b7f72ed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8b7f72ed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8b7f72ed
Branch: refs/heads/branch-2.2
Commit: 8b7f72ed37dac0daf5158a7f96b38fb1eab1d676
Parents: 6b4ec22
Author: Shixiong Zhu <zs...@gmail.com>
Authored: Fri Nov 10 14:14:47 2017 -0800
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Fri Nov 10 14:14:47 2017 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/ScalaReflection.scala | 28 ++++++++++++----
.../encoders/ExpressionEncoderSuite.scala | 35 ++++++++++++++++++--
2 files changed, 54 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 3b3d566..ad21842 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -62,7 +62,7 @@ object ScalaReflection extends ScalaReflection {
*/
def dataTypeFor[T : TypeTag]: DataType = dataTypeFor(localTypeOf[T])
- private def dataTypeFor(tpe: `Type`): DataType = ScalaReflectionLock.synchronized {
+ private def dataTypeFor(tpe: `Type`): DataType = cleanUpReflectionObjects {
tpe match {
case t if t <:< definitions.IntTpe => IntegerType
case t if t <:< definitions.LongTpe => LongType
@@ -92,7 +92,7 @@ object ScalaReflection extends ScalaReflection {
* Array[T]. Special handling is performed for primitive types to map them back to their raw
* JVM form instead of the Scala Array that handles auto boxing.
*/
- private def arrayClassFor(tpe: `Type`): ObjectType = ScalaReflectionLock.synchronized {
+ private def arrayClassFor(tpe: `Type`): ObjectType = cleanUpReflectionObjects {
val cls = tpe match {
case t if t <:< definitions.IntTpe => classOf[Array[Int]]
case t if t <:< definitions.LongTpe => classOf[Array[Long]]
@@ -145,7 +145,7 @@ object ScalaReflection extends ScalaReflection {
private def deserializerFor(
tpe: `Type`,
path: Option[Expression],
- walkedTypePath: Seq[String]): Expression = ScalaReflectionLock.synchronized {
+ walkedTypePath: Seq[String]): Expression = cleanUpReflectionObjects {
/** Returns the current path with a sub-field extracted. */
def addToPath(part: String, dataType: DataType, walkedTypePath: Seq[String]): Expression = {
@@ -452,7 +452,7 @@ object ScalaReflection extends ScalaReflection {
inputObject: Expression,
tpe: `Type`,
walkedTypePath: Seq[String],
- seenTypeSet: Set[`Type`] = Set.empty): Expression = ScalaReflectionLock.synchronized {
+ seenTypeSet: Set[`Type`] = Set.empty): Expression = cleanUpReflectionObjects {
def toCatalystArray(input: Expression, elementType: `Type`): Expression = {
dataTypeFor(elementType) match {
@@ -638,7 +638,7 @@ object ScalaReflection extends ScalaReflection {
* Returns true if the given type is option of product type, e.g. `Option[Tuple2]`. Note that,
* we also treat [[DefinedByConstructorParams]] as product type.
*/
- def optionOfProductType(tpe: `Type`): Boolean = ScalaReflectionLock.synchronized {
+ def optionOfProductType(tpe: `Type`): Boolean = cleanUpReflectionObjects {
tpe match {
case t if t <:< localTypeOf[Option[_]] =>
val TypeRef(_, _, Seq(optType)) = t
@@ -700,7 +700,7 @@ object ScalaReflection extends ScalaReflection {
def schemaFor[T: TypeTag]: Schema = schemaFor(localTypeOf[T])
/** Returns a catalyst DataType and its nullability for the given Scala Type using reflection. */
- def schemaFor(tpe: `Type`): Schema = ScalaReflectionLock.synchronized {
+ def schemaFor(tpe: `Type`): Schema = cleanUpReflectionObjects {
tpe match {
case t if t.typeSymbol.annotations.exists(_.tpe =:= typeOf[SQLUserDefinedType]) =>
val udt = getClassFromType(t).getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance()
@@ -766,7 +766,7 @@ object ScalaReflection extends ScalaReflection {
/**
* Whether the fields of the given type is defined entirely by its constructor parameters.
*/
- def definedByConstructorParams(tpe: Type): Boolean = {
+ def definedByConstructorParams(tpe: Type): Boolean = cleanUpReflectionObjects {
tpe <:< localTypeOf[Product] || tpe <:< localTypeOf[DefinedByConstructorParams]
}
@@ -796,6 +796,20 @@ trait ScalaReflection {
import scala.collection.Map
/**
+ * Any codes calling `scala.reflect.api.Types.TypeApi.<:<` should be wrapped by this method to
+ * clean up the Scala reflection garbage automatically. Otherwise, it will leak some objects to
+ * `scala.reflect.runtime.JavaUniverse.undoLog`.
+ *
+ * This method will also wrap `func` with `ScalaReflectionLock.synchronized` so the caller doesn't
+ * need to call it again.
+ *
+ * @see https://github.com/scala/bug/issues/8302
+ */
+ def cleanUpReflectionObjects[T](func: => T): T = ScalaReflectionLock.synchronized {
+ universe.asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.undo(func)
+ }
+
+ /**
* Return the Scala Type for `T` in the current classloader mirror.
*
* Use this method instead of the convenience method `universe.typeOf`, which
http://git-wip-us.apache.org/repos/asf/spark/blob/8b7f72ed/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
index bb1955a..e6d09bd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoderSuite.scala
@@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
+import org.apache.spark.util.ClosureCleaner
case class RepeatedStruct(s: Seq[PrimitiveData])
@@ -114,7 +115,9 @@ object ReferenceValueClass {
class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
OuterScopes.addOuterScope(this)
- implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = ExpressionEncoder()
+ implicit def encoder[T : TypeTag]: ExpressionEncoder[T] = verifyNotLeakingReflectionObjects {
+ ExpressionEncoder()
+ }
// test flat encoders
encodeDecodeTest(false, "primitive boolean")
@@ -370,8 +373,12 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
private def encodeDecodeTest[T : ExpressionEncoder](
input: T,
testName: String): Unit = {
- test(s"encode/decode for $testName: $input") {
+ testAndVerifyNotLeakingReflectionObjects(s"encode/decode for $testName: $input") {
val encoder = implicitly[ExpressionEncoder[T]]
+
+ // Make sure encoder is serializable.
+ ClosureCleaner.clean((s: String) => encoder.getClass.getName)
+
val row = encoder.toRow(input)
val schema = encoder.schema.toAttributes
val boundEncoder = encoder.resolveAndBind()
@@ -441,4 +448,28 @@ class ExpressionEncoderSuite extends PlanTest with AnalysisTest {
}
}
}
+
+ /**
+ * Verify the size of scala.reflect.runtime.JavaUniverse.undoLog before and after `func` to
+ * ensure we don't leak Scala reflection garbage.
+ *
+ * @see org.apache.spark.sql.catalyst.ScalaReflection.cleanUpReflectionObjects
+ */
+ private def verifyNotLeakingReflectionObjects[T](func: => T): T = {
+ def undoLogSize: Int = {
+ scala.reflect.runtime.universe
+ .asInstanceOf[scala.reflect.runtime.JavaUniverse].undoLog.log.size
+ }
+
+ val previousUndoLogSize = undoLogSize
+ val r = func
+ assert(previousUndoLogSize == undoLogSize)
+ r
+ }
+
+ private def testAndVerifyNotLeakingReflectionObjects(testName: String)(testFun: => Any) {
+ test(testName) {
+ verifyNotLeakingReflectionObjects(testFun)
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org