You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2016/03/09 11:23:30 UTC

spark git commit: [SPARK-13640][SQL] Synchronize ScalaReflection.mirror method.

Repository: spark
Updated Branches:
  refs/heads/master f3201aeeb -> 2c5af7d4d


[SPARK-13640][SQL] Synchronize ScalaReflection.mirror method.

## What changes were proposed in this pull request?

`ScalaReflection.mirror` method should be synchronized when scala version is `2.10` because `universe.runtimeMirror` is not thread safe.

## How was this patch tested?

I added a test to check thread safety of `ScalaRefection.mirror` method in `ScalaReflectionSuite`, which will throw the following Exception in Scala `2.10` without this patch:

```
[info] - thread safety of mirror *** FAILED *** (49 milliseconds)
[info]   java.lang.UnsupportedOperationException: tail of empty list
[info]   at scala.collection.immutable.Nil$.tail(List.scala:339)
[info]   at scala.collection.immutable.Nil$.tail(List.scala:334)
[info]   at scala.reflect.internal.SymbolTable.popPhase(SymbolTable.scala:172)
[info]   at scala.reflect.internal.Symbols$Symbol.unsafeTypeParams(Symbols.scala:1477)
[info]   at scala.reflect.internal.Symbols$TypeSymbol.tpe(Symbols.scala:2777)
[info]   at scala.reflect.internal.Mirrors$RootsBase.init(Mirrors.scala:235)
[info]   at scala.reflect.runtime.JavaMirrors$class.createMirror(JavaMirrors.scala:34)
[info]   at scala.reflect.runtime.JavaMirrors$class.runtimeMirror(JavaMirrors.scala:61)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at scala.reflect.runtime.JavaUniverse.runtimeMirror(JavaUniverse.scala:12)
[info]   at org.apache.spark.sql.catalyst.ScalaReflection$.mirror(ScalaReflection.scala:36)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:256)
[info]   at org.apache.spark.sql.catalyst.ScalaReflectionSuite$$anonfun$12$$anonfun$apply$mcV$sp$1$$anonfun$apply$1$$anonfun$apply$2.apply(ScalaReflectionSuite.scala:252)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
[info]   at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
[info]   at scala.concurrent.impl.ExecutionContextImpl$$anon$3.exec(ExecutionContextImpl.scala:107)
[info]   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
[info]   at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
[info]   at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
[info]   at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
```

Notice that the test will pass when Scala version is `2.11`.

Author: Takuya UESHIN <ue...@happy-camper.st>

Closes #11487 from ueshin/issues/SPARK-13640.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c5af7d4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c5af7d4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c5af7d4

Branch: refs/heads/master
Commit: 2c5af7d4d939e18a749d33b5de2e5113aa3eff08
Parents: f3201ae
Author: Takuya UESHIN <ue...@happy-camper.st>
Authored: Wed Mar 9 10:23:27 2016 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Mar 9 10:23:27 2016 +0000

----------------------------------------------------------------------
 .../spark/sql/catalyst/ScalaReflection.scala    |  7 +++-
 .../sql/catalyst/ScalaReflectionSuite.scala     | 41 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2c5af7d4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 02cb2d9..c12b5c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -32,8 +32,10 @@ object ScalaReflection extends ScalaReflection {
   // Since we are creating a runtime mirror usign the class loader of current thread,
   // we need to use def at here. So, every time we call mirror, it is using the
   // class loader of the current thread.
-  override def mirror: universe.Mirror =
+  // SPARK-13640: Synchronize this because universe.runtimeMirror is not thread-safe in Scala 2.10.
+  override def mirror: universe.Mirror = ScalaReflectionLock.synchronized {
     universe.runtimeMirror(Thread.currentThread().getContextClassLoader)
+  }
 
   import universe._
 
@@ -665,7 +667,8 @@ trait ScalaReflection {
    *
    * @see SPARK-5281
    */
-  def localTypeOf[T: TypeTag]: `Type` = {
+  // SPARK-13640: Synchronize this because TypeTag.tpe is not thread-safe in Scala 2.10.
+  def localTypeOf[T: TypeTag]: `Type` = ScalaReflectionLock.synchronized {
     val tag = implicitly[TypeTag[T]]
     tag.in(mirror).tpe.normalize
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2c5af7d4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 5fe09b1..dd31050 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -17,10 +17,15 @@
 
 package org.apache.spark.sql.catalyst
 
+import java.net.URLClassLoader
 import java.sql.{Date, Timestamp}
 
+import scala.reflect.runtime.universe.typeOf
+
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.expressions.BoundReference
 import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 case class PrimitiveData(
     intField: Int,
@@ -236,4 +241,40 @@ class ScalaReflectionSuite extends SparkFunSuite {
     assert(anyTypes.forall(!_.isPrimitive))
     assert(anyTypes === Seq(classOf[java.lang.Object], classOf[java.lang.Object]))
   }
+
+  private val dataTypeForComplexData = dataTypeFor[ComplexData]
+  private val typeOfComplexData = typeOf[ComplexData]
+
+  Seq(
+    ("mirror", () => mirror),
+    ("dataTypeFor", () => dataTypeFor[ComplexData]),
+    ("constructorFor", () => constructorFor[ComplexData]),
+    ("extractorsFor", {
+      val inputObject = BoundReference(0, dataTypeForComplexData, nullable = false)
+      () => extractorsFor[ComplexData](inputObject)
+    }),
+    ("getConstructorParameters(cls)", () => getConstructorParameters(classOf[ComplexData])),
+    ("getConstructorParameterNames", () => getConstructorParameterNames(classOf[ComplexData])),
+    ("getClassFromType", () => getClassFromType(typeOfComplexData)),
+    ("schemaFor", () => schemaFor[ComplexData]),
+    ("localTypeOf", () => localTypeOf[ComplexData]),
+    ("getClassNameFromType", () => getClassNameFromType(typeOfComplexData)),
+    ("getParameterTypes", () => getParameterTypes(() => ())),
+    ("getConstructorParameters(tpe)", () => getClassNameFromType(typeOfComplexData))).foreach {
+      case (name, exec) =>
+        test(s"SPARK-13640: thread safety of ${name}") {
+          (0 until 100).foreach { _ =>
+            val loader = new URLClassLoader(Array.empty, Utils.getContextOrSparkClassLoader)
+            (0 until 10).par.foreach { _ =>
+              val cl = Thread.currentThread.getContextClassLoader
+              try {
+                Thread.currentThread.setContextClassLoader(loader)
+                exec()
+              } finally {
+                Thread.currentThread.setContextClassLoader(cl)
+              }
+            }
+          }
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org