You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/16 02:58:32 UTC

git commit: [SPARK-2498] [SQL] Synchronize on a lock when using scala reflection inside data type objects.

Repository: spark
Updated Branches:
  refs/heads/master 502f90782 -> c2048a516


[SPARK-2498] [SQL] Synchronize on a lock when using scala reflection inside data type objects.

JIRA ticket: https://issues.apache.org/jira/browse/SPARK-2498

Author: Zongheng Yang <zo...@gmail.com>

Closes #1423 from concretevitamin/scala-ref-catalyst and squashes the following commits:

325a149 [Zongheng Yang] Synchronize on a lock when initializing data type objects in Catalyst.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c2048a51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c2048a51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c2048a51

Branch: refs/heads/master
Commit: c2048a5165b270f5baf2003fdfef7bc6c5875715
Parents: 502f907
Author: Zongheng Yang <zo...@gmail.com>
Authored: Tue Jul 15 17:58:28 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Jul 15 17:58:28 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/types/dataTypes.scala    | 34 +++++++++++---------
 1 file changed, 19 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c2048a51/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index bb77bcc..cd4b5e9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,17 +19,20 @@ package org.apache.spark.sql.catalyst.types
 
 import java.sql.Timestamp
 
-import scala.util.parsing.combinator.RegexParsers
-
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.{typeTag, TypeTag, runtimeMirror}
+import scala.util.parsing.combinator.RegexParsers
 
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
 import org.apache.spark.util.Utils
 
 /**
- *
+ * A JVM-global lock that should be used to prevent thread safety issues when using things in
+ * scala.reflect.*.  Note that Scala Reflection API is made thread-safe in 2.11, but not yet for
+ * 2.10.* builds.  See SI-6240 for more details.
  */
+protected[catalyst] object ScalaReflectionLock
+
 object DataType extends RegexParsers {
   protected lazy val primitiveType: Parser[DataType] =
     "StringType" ^^^ StringType |
@@ -62,7 +65,6 @@ object DataType extends RegexParsers {
     "true" ^^^ true |
     "false" ^^^ false
 
-
   protected lazy val structType: Parser[DataType] =
     "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
       case fields => new StructType(fields)
@@ -106,7 +108,7 @@ abstract class NativeType extends DataType {
   @transient val tag: TypeTag[JvmType]
   val ordering: Ordering[JvmType]
 
-  @transient val classTag = {
+  @transient val classTag = ScalaReflectionLock.synchronized {
     val mirror = runtimeMirror(Utils.getSparkClassLoader)
     ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
   }
@@ -114,22 +116,24 @@ abstract class NativeType extends DataType {
 
 case object StringType extends NativeType with PrimitiveType {
   type JvmType = String
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val ordering = implicitly[Ordering[JvmType]]
 }
+
 case object BinaryType extends DataType with PrimitiveType {
   type JvmType = Array[Byte]
 }
+
 case object BooleanType extends NativeType with PrimitiveType {
   type JvmType = Boolean
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val ordering = implicitly[Ordering[JvmType]]
 }
 
 case object TimestampType extends NativeType {
   type JvmType = Timestamp
 
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
 
   val ordering = new Ordering[JvmType] {
     def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
@@ -159,7 +163,7 @@ abstract class IntegralType extends NumericType {
 
 case object LongType extends IntegralType {
   type JvmType = Long
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Long]]
   val integral = implicitly[Integral[Long]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -167,7 +171,7 @@ case object LongType extends IntegralType {
 
 case object IntegerType extends IntegralType {
   type JvmType = Int
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Int]]
   val integral = implicitly[Integral[Int]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -175,7 +179,7 @@ case object IntegerType extends IntegralType {
 
 case object ShortType extends IntegralType {
   type JvmType = Short
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Short]]
   val integral = implicitly[Integral[Short]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -183,7 +187,7 @@ case object ShortType extends IntegralType {
 
 case object ByteType extends IntegralType {
   type JvmType = Byte
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Byte]]
   val integral = implicitly[Integral[Byte]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -202,7 +206,7 @@ abstract class FractionalType extends NumericType {
 
 case object DecimalType extends FractionalType {
   type JvmType = BigDecimal
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[BigDecimal]]
   val fractional = implicitly[Fractional[BigDecimal]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -210,7 +214,7 @@ case object DecimalType extends FractionalType {
 
 case object DoubleType extends FractionalType {
   type JvmType = Double
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Double]]
   val fractional = implicitly[Fractional[Double]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -218,7 +222,7 @@ case object DoubleType extends FractionalType {
 
 case object FloatType extends FractionalType {
   type JvmType = Float
-  @transient lazy val tag = typeTag[JvmType]
+  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
   val numeric = implicitly[Numeric[Float]]
   val fractional = implicitly[Fractional[Float]]
   val ordering = implicitly[Ordering[JvmType]]