You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/01/14 02:16:52 UTC

[4/5] spark git commit: [SPARK-5123][SQL] Reconcile Java/Scala API for data types.

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/decimal/Decimal.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/decimal/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/decimal/Decimal.scala
deleted file mode 100644
index 708362a..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/decimal/Decimal.scala
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.types.decimal
-
-import org.apache.spark.annotation.DeveloperApi
-
-/**
- * A mutable implementation of BigDecimal that can hold a Long if values are small enough.
- *
- * The semantics of the fields are as follows:
- * - _precision and _scale represent the SQL precision and scale we are looking for
- * - If decimalVal is set, it represents the whole decimal value
- * - Otherwise, the decimal value is longVal / (10 ** _scale)
- */
-final class Decimal extends Ordered[Decimal] with Serializable {
-  import Decimal.{MAX_LONG_DIGITS, POW_10, ROUNDING_MODE, BIG_DEC_ZERO}
-
-  private var decimalVal: BigDecimal = null
-  private var longVal: Long = 0L
-  private var _precision: Int = 1
-  private var _scale: Int = 0
-
-  def precision: Int = _precision
-  def scale: Int = _scale
-
-  /**
-   * Set this Decimal to the given Long. Will have precision 20 and scale 0.
-   */
-  def set(longVal: Long): Decimal = {
-    if (longVal <= -POW_10(MAX_LONG_DIGITS) || longVal >= POW_10(MAX_LONG_DIGITS)) {
-      // We can't represent this compactly as a long without risking overflow
-      this.decimalVal = BigDecimal(longVal)
-      this.longVal = 0L
-    } else {
-      this.decimalVal = null
-      this.longVal = longVal
-    }
-    this._precision = 20
-    this._scale = 0
-    this
-  }
-
-  /**
-   * Set this Decimal to the given Int. Will have precision 10 and scale 0.
-   */
-  def set(intVal: Int): Decimal = {
-    this.decimalVal = null
-    this.longVal = intVal
-    this._precision = 10
-    this._scale = 0
-    this
-  }
-
-  /**
-   * Set this Decimal to the given unscaled Long, with a given precision and scale.
-   */
-  def set(unscaled: Long, precision: Int, scale: Int): Decimal = {
-    if (setOrNull(unscaled, precision, scale) == null) {
-      throw new IllegalArgumentException("Unscaled value too large for precision")
-    }
-    this
-  }
-
-  /**
-   * Set this Decimal to the given unscaled Long, with a given precision and scale,
-   * and return it, or return null if it cannot be set due to overflow.
-   */
-  def setOrNull(unscaled: Long, precision: Int, scale: Int): Decimal = {
-    if (unscaled <= -POW_10(MAX_LONG_DIGITS) || unscaled >= POW_10(MAX_LONG_DIGITS)) {
-      // We can't represent this compactly as a long without risking overflow
-      if (precision < 19) {
-        return null  // Requested precision is too low to represent this value
-      }
-      this.decimalVal = BigDecimal(longVal)
-      this.longVal = 0L
-    } else {
-      val p = POW_10(math.min(precision, MAX_LONG_DIGITS))
-      if (unscaled <= -p || unscaled >= p) {
-        return null  // Requested precision is too low to represent this value
-      }
-      this.decimalVal = null
-      this.longVal = unscaled
-    }
-    this._precision = precision
-    this._scale = scale
-    this
-  }
-
-  /**
-   * Set this Decimal to the given BigDecimal value, with a given precision and scale.
-   */
-  def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
-    this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
-    require(decimalVal.precision <= precision, "Overflowed precision")
-    this.longVal = 0L
-    this._precision = precision
-    this._scale = scale
-    this
-  }
-
-  /**
-   * Set this Decimal to the given BigDecimal value, inheriting its precision and scale.
-   */
-  def set(decimal: BigDecimal): Decimal = {
-    this.decimalVal = decimal
-    this.longVal = 0L
-    this._precision = decimal.precision
-    this._scale = decimal.scale
-    this
-  }
-
-  /**
-   * Set this Decimal to the given Decimal value.
-   */
-  def set(decimal: Decimal): Decimal = {
-    this.decimalVal = decimal.decimalVal
-    this.longVal = decimal.longVal
-    this._precision = decimal._precision
-    this._scale = decimal._scale
-    this
-  }
-
-  def toBigDecimal: BigDecimal = {
-    if (decimalVal.ne(null)) {
-      decimalVal
-    } else {
-      BigDecimal(longVal, _scale)
-    }
-  }
-
-  def toUnscaledLong: Long = {
-    if (decimalVal.ne(null)) {
-      decimalVal.underlying().unscaledValue().longValue()
-    } else {
-      longVal
-    }
-  }
-
-  override def toString: String = toBigDecimal.toString()
-
-  @DeveloperApi
-  def toDebugString: String = {
-    if (decimalVal.ne(null)) {
-      s"Decimal(expanded,$decimalVal,$precision,$scale})"
-    } else {
-      s"Decimal(compact,$longVal,$precision,$scale})"
-    }
-  }
-
-  def toDouble: Double = toBigDecimal.doubleValue()
-
-  def toFloat: Float = toBigDecimal.floatValue()
-
-  def toLong: Long = {
-    if (decimalVal.eq(null)) {
-      longVal / POW_10(_scale)
-    } else {
-      decimalVal.longValue()
-    }
-  }
-
-  def toInt: Int = toLong.toInt
-
-  def toShort: Short = toLong.toShort
-
-  def toByte: Byte = toLong.toByte
-
-  /**
-   * Update precision and scale while keeping our value the same, and return true if successful.
-   *
-   * @return true if successful, false if overflow would occur
-   */
-  def changePrecision(precision: Int, scale: Int): Boolean = {
-    // First, update our longVal if we can, or transfer over to using a BigDecimal
-    if (decimalVal.eq(null)) {
-      if (scale < _scale) {
-        // Easier case: we just need to divide our scale down
-        val diff = _scale - scale
-        val droppedDigits = longVal % POW_10(diff)
-        longVal /= POW_10(diff)
-        if (math.abs(droppedDigits) * 2 >= POW_10(diff)) {
-          longVal += (if (longVal < 0) -1L else 1L)
-        }
-      } else if (scale > _scale) {
-        // We might be able to multiply longVal by a power of 10 and not overflow, but if not,
-        // switch to using a BigDecimal
-        val diff = scale - _scale
-        val p = POW_10(math.max(MAX_LONG_DIGITS - diff, 0))
-        if (diff <= MAX_LONG_DIGITS && longVal > -p && longVal < p) {
-          // Multiplying longVal by POW_10(diff) will still keep it below MAX_LONG_DIGITS
-          longVal *= POW_10(diff)
-        } else {
-          // Give up on using Longs; switch to BigDecimal, which we'll modify below
-          decimalVal = BigDecimal(longVal, _scale)
-        }
-      }
-      // In both cases, we will check whether our precision is okay below
-    }
-
-    if (decimalVal.ne(null)) {
-      // We get here if either we started with a BigDecimal, or we switched to one because we would
-      // have overflowed our Long; in either case we must rescale decimalVal to the new scale.
-      val newVal = decimalVal.setScale(scale, ROUNDING_MODE)
-      if (newVal.precision > precision) {
-        return false
-      }
-      decimalVal = newVal
-    } else {
-      // We're still using Longs, but we should check whether we match the new precision
-      val p = POW_10(math.min(_precision, MAX_LONG_DIGITS))
-      if (longVal <= -p || longVal >= p) {
-        // Note that we shouldn't have been able to fix this by switching to BigDecimal
-        return false
-      }
-    }
-
-    _precision = precision
-    _scale = scale
-    true
-  }
-
-  override def clone(): Decimal = new Decimal().set(this)
-
-  override def compare(other: Decimal): Int = {
-    if (decimalVal.eq(null) && other.decimalVal.eq(null) && _scale == other._scale) {
-      if (longVal < other.longVal) -1 else if (longVal == other.longVal) 0 else 1
-    } else {
-      toBigDecimal.compare(other.toBigDecimal)
-    }
-  }
-
-  override def equals(other: Any) = other match {
-    case d: Decimal =>
-      compare(d) == 0
-    case _ =>
-      false
-  }
-
-  override def hashCode(): Int = toBigDecimal.hashCode()
-
-  def isZero: Boolean = if (decimalVal.ne(null)) decimalVal == BIG_DEC_ZERO else longVal == 0
-
-  def + (that: Decimal): Decimal = Decimal(toBigDecimal + that.toBigDecimal)
-
-  def - (that: Decimal): Decimal = Decimal(toBigDecimal - that.toBigDecimal)
-
-  def * (that: Decimal): Decimal = Decimal(toBigDecimal * that.toBigDecimal)
-
-  def / (that: Decimal): Decimal =
-    if (that.isZero) null else Decimal(toBigDecimal / that.toBigDecimal)
-
-  def % (that: Decimal): Decimal =
-    if (that.isZero) null else Decimal(toBigDecimal % that.toBigDecimal)
-
-  def remainder(that: Decimal): Decimal = this % that
-
-  def unary_- : Decimal = {
-    if (decimalVal.ne(null)) {
-      Decimal(-decimalVal)
-    } else {
-      Decimal(-longVal, precision, scale)
-    }
-  }
-}
-
-object Decimal {
-  private val ROUNDING_MODE = BigDecimal.RoundingMode.HALF_UP
-
-  /** Maximum number of decimal digits a Long can represent */
-  val MAX_LONG_DIGITS = 18
-
-  private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
-
-  private val BIG_DEC_ZERO = BigDecimal(0)
-
-  def apply(value: Double): Decimal = new Decimal().set(value)
-
-  def apply(value: Long): Decimal = new Decimal().set(value)
-
-  def apply(value: Int): Decimal = new Decimal().set(value)
-
-  def apply(value: BigDecimal): Decimal = new Decimal().set(value)
-
-  def apply(value: BigDecimal, precision: Int, scale: Int): Decimal =
-    new Decimal().set(value, precision, scale)
-
-  def apply(unscaled: Long, precision: Int, scale: Int): Decimal =
-    new Decimal().set(unscaled, precision, scale)
-
-  def apply(value: String): Decimal = new Decimal().set(BigDecimal(value))
-
-  // Evidence parameters for Decimal considered either as Fractional or Integral. We provide two
-  // parameters inheriting from a common trait since both traits define mkNumericOps.
-  // See scala.math's Numeric.scala for examples for Scala's built-in types.
-
-  /** Common methods for Decimal evidence parameters */
-  trait DecimalIsConflicted extends Numeric[Decimal] {
-    override def plus(x: Decimal, y: Decimal): Decimal = x + y
-    override def times(x: Decimal, y: Decimal): Decimal = x * y
-    override def minus(x: Decimal, y: Decimal): Decimal = x - y
-    override def negate(x: Decimal): Decimal = -x
-    override def toDouble(x: Decimal): Double = x.toDouble
-    override def toFloat(x: Decimal): Float = x.toFloat
-    override def toInt(x: Decimal): Int = x.toInt
-    override def toLong(x: Decimal): Long = x.toLong
-    override def fromInt(x: Int): Decimal = new Decimal().set(x)
-    override def compare(x: Decimal, y: Decimal): Int = x.compare(y)
-  }
-
-  /** A [[scala.math.Fractional]] evidence parameter for Decimals. */
-  object DecimalIsFractional extends DecimalIsConflicted with Fractional[Decimal] {
-    override def div(x: Decimal, y: Decimal): Decimal = x / y
-  }
-
-  /** A [[scala.math.Integral]] evidence parameter for Decimals. */
-  object DecimalAsIfIntegral extends DecimalIsConflicted with Integral[Decimal] {
-    override def quot(x: Decimal, y: Decimal): Decimal = x / y
-    override def rem(x: Decimal, y: Decimal): Decimal = x % y
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
deleted file mode 100644
index de24449..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/package.scala
+++ /dev/null
@@ -1,23 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-/**
- * Contains a type system for attributes produced by relations, including complex types like
- * structs, arrays and maps.
- */
-package object types

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
deleted file mode 100755
index 8172733..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/Metadata.scala
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst.util
-
-import scala.collection.mutable
-
-import org.json4s._
-import org.json4s.jackson.JsonMethods._
-
-/**
- * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean,
- * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and
- * Array[Metadata]. JSON is used for serialization.
- *
- * The default constructor is private. User should use either [[MetadataBuilder]] or
- * [[Metadata$#fromJson]] to create Metadata instances.
- *
- * @param map an immutable map that stores the data
- */
-sealed class Metadata private[util] (private[util] val map: Map[String, Any]) extends Serializable {
-
-  /** Tests whether this Metadata contains a binding for a key. */
-  def contains(key: String): Boolean = map.contains(key)
-
-  /** Gets a Long. */
-  def getLong(key: String): Long = get(key)
-
-  /** Gets a Double. */
-  def getDouble(key: String): Double = get(key)
-
-  /** Gets a Boolean. */
-  def getBoolean(key: String): Boolean = get(key)
-
-  /** Gets a String. */
-  def getString(key: String): String = get(key)
-
-  /** Gets a Metadata. */
-  def getMetadata(key: String): Metadata = get(key)
-
-  /** Gets a Long array. */
-  def getLongArray(key: String): Array[Long] = get(key)
-
-  /** Gets a Double array. */
-  def getDoubleArray(key: String): Array[Double] = get(key)
-
-  /** Gets a Boolean array. */
-  def getBooleanArray(key: String): Array[Boolean] = get(key)
-
-  /** Gets a String array. */
-  def getStringArray(key: String): Array[String] = get(key)
-
-  /** Gets a Metadata array. */
-  def getMetadataArray(key: String): Array[Metadata] = get(key)
-
-  /** Converts to its JSON representation. */
-  def json: String = compact(render(jsonValue))
-
-  override def toString: String = json
-
-  override def equals(obj: Any): Boolean = {
-    obj match {
-      case that: Metadata =>
-        if (map.keySet == that.map.keySet) {
-          map.keys.forall { k =>
-            (map(k), that.map(k)) match {
-              case (v0: Array[_], v1: Array[_]) =>
-                v0.view == v1.view
-              case (v0, v1) =>
-                v0 == v1
-            }
-          }
-        } else {
-          false
-        }
-      case other =>
-        false
-    }
-  }
-
-  override def hashCode: Int = Metadata.hash(this)
-
-  private def get[T](key: String): T = {
-    map(key).asInstanceOf[T]
-  }
-
-  private[sql] def jsonValue: JValue = Metadata.toJsonValue(this)
-}
-
-object Metadata {
-
-  /** Returns an empty Metadata. */
-  def empty: Metadata = new Metadata(Map.empty)
-
-  /** Creates a Metadata instance from JSON. */
-  def fromJson(json: String): Metadata = {
-    fromJObject(parse(json).asInstanceOf[JObject])
-  }
-
-  /** Creates a Metadata instance from JSON AST. */
-  private[sql] def fromJObject(jObj: JObject): Metadata = {
-    val builder = new MetadataBuilder
-    jObj.obj.foreach {
-      case (key, JInt(value)) =>
-        builder.putLong(key, value.toLong)
-      case (key, JDouble(value)) =>
-        builder.putDouble(key, value)
-      case (key, JBool(value)) =>
-        builder.putBoolean(key, value)
-      case (key, JString(value)) =>
-        builder.putString(key, value)
-      case (key, o: JObject) =>
-        builder.putMetadata(key, fromJObject(o))
-      case (key, JArray(value)) =>
-        if (value.isEmpty) {
-          // If it is an empty array, we cannot infer its element type. We put an empty Array[Long].
-          builder.putLongArray(key, Array.empty)
-        } else {
-          value.head match {
-            case _: JInt =>
-              builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray)
-            case _: JDouble =>
-              builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray)
-            case _: JBool =>
-              builder.putBooleanArray(key, value.asInstanceOf[List[JBool]].map(_.value).toArray)
-            case _: JString =>
-              builder.putStringArray(key, value.asInstanceOf[List[JString]].map(_.s).toArray)
-            case _: JObject =>
-              builder.putMetadataArray(
-                key, value.asInstanceOf[List[JObject]].map(fromJObject).toArray)
-            case other =>
-              throw new RuntimeException(s"Do not support array of type ${other.getClass}.")
-          }
-        }
-      case other =>
-        throw new RuntimeException(s"Do not support type ${other.getClass}.")
-    }
-    builder.build()
-  }
-
-  /** Converts to JSON AST. */
-  private def toJsonValue(obj: Any): JValue = {
-    obj match {
-      case map: Map[_, _] =>
-        val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
-        JObject(fields)
-      case arr: Array[_] =>
-        val values = arr.toList.map(toJsonValue)
-        JArray(values)
-      case x: Long =>
-        JInt(x)
-      case x: Double =>
-        JDouble(x)
-      case x: Boolean =>
-        JBool(x)
-      case x: String =>
-        JString(x)
-      case x: Metadata =>
-        toJsonValue(x.map)
-      case other =>
-        throw new RuntimeException(s"Do not support type ${other.getClass}.")
-    }
-  }
-
-  /** Computes the hash code for the types we support. */
-  private def hash(obj: Any): Int = {
-    obj match {
-      case map: Map[_, _] =>
-        map.mapValues(hash).##
-      case arr: Array[_] =>
-        // Seq.empty[T] has the same hashCode regardless of T.
-        arr.toSeq.map(hash).##
-      case x: Long =>
-        x.##
-      case x: Double =>
-        x.##
-      case x: Boolean =>
-        x.##
-      case x: String =>
-        x.##
-      case x: Metadata =>
-        hash(x.map)
-      case other =>
-        throw new RuntimeException(s"Do not support type ${other.getClass}.")
-    }
-  }
-}
-
-/**
- * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former.
- */
-class MetadataBuilder {
-
-  private val map: mutable.Map[String, Any] = mutable.Map.empty
-
-  /** Returns the immutable version of this map.  Used for java interop. */
-  protected def getMap = map.toMap
-
-  /** Include the content of an existing [[Metadata]] instance. */
-  def withMetadata(metadata: Metadata): this.type = {
-    map ++= metadata.map
-    this
-  }
-
-  /** Puts a Long. */
-  def putLong(key: String, value: Long): this.type = put(key, value)
-
-  /** Puts a Double. */
-  def putDouble(key: String, value: Double): this.type = put(key, value)
-
-  /** Puts a Boolean. */
-  def putBoolean(key: String, value: Boolean): this.type = put(key, value)
-
-  /** Puts a String. */
-  def putString(key: String, value: String): this.type = put(key, value)
-
-  /** Puts a [[Metadata]]. */
-  def putMetadata(key: String, value: Metadata): this.type = put(key, value)
-
-  /** Puts a Long array. */
-  def putLongArray(key: String, value: Array[Long]): this.type = put(key, value)
-
-  /** Puts a Double array. */
-  def putDoubleArray(key: String, value: Array[Double]): this.type = put(key, value)
-
-  /** Puts a Boolean array. */
-  def putBooleanArray(key: String, value: Array[Boolean]): this.type = put(key, value)
-
-  /** Puts a String array. */
-  def putStringArray(key: String, value: Array[String]): this.type = put(key, value)
-
-  /** Puts a [[Metadata]] array. */
-  def putMetadataArray(key: String, value: Array[Metadata]): this.type = put(key, value)
-
-  /** Builds the [[Metadata]] instance. */
-  def build(): Metadata = {
-    new Metadata(map.toMap)
-  }
-
-  private def put(key: String, value: Any): this.type = {
-    map.put(key, value)
-    this
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
new file mode 100644
index 0000000..2a8914c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypeConversions.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.text.SimpleDateFormat
+
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.types.decimal.Decimal
+
+
+protected[sql] object DataTypeConversions {
+
+  def stringToTime(s: String): java.util.Date = {
+    if (!s.contains('T')) {
+      // JDBC escape string
+      if (s.contains(' ')) {
+        java.sql.Timestamp.valueOf(s)
+      } else {
+        java.sql.Date.valueOf(s)
+      }
+    } else if (s.endsWith("Z")) {
+      // this is zero timezone of ISO8601
+      stringToTime(s.substring(0, s.length - 1) + "GMT-00:00")
+    } else if (s.indexOf("GMT") == -1) {
+      // timezone with ISO8601
+      val inset = "+00.00".length
+      val s0 = s.substring(0, s.length - inset)
+      val s1 = s.substring(s.length - inset, s.length)
+      if (s0.substring(s0.lastIndexOf(':')).contains('.')) {
+        stringToTime(s0 + "GMT" + s1)
+      } else {
+        stringToTime(s0 + ".0GMT" + s1)
+      }
+    } else {
+      // ISO8601 with GMT insert
+      val ISO8601GMT: SimpleDateFormat = new SimpleDateFormat( "yyyy-MM-dd'T'HH:mm:ss.SSSz" )
+      ISO8601GMT.parse(s)
+    }
+  }
+
+  /** Converts Java objects to catalyst rows / types */
+  def convertJavaToCatalyst(a: Any, dataType: DataType): Any = (a, dataType) match {
+    case (obj, udt: UserDefinedType[_]) => ScalaReflection.convertToCatalyst(obj, udt) // Scala type
+    case (d: java.math.BigDecimal, _) => Decimal(BigDecimal(d))
+    case (other, _) => other
+  }
+
+  /** Converts Java objects to catalyst rows / types */
+  def convertCatalystToJava(a: Any): Any = a match {
+    case d: scala.math.BigDecimal => d.underlying()
+    case other => other
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypes.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypes.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypes.java
new file mode 100644
index 0000000..e457542
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataTypes.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types;
+
+import java.util.*;
+
+/**
+ * To get/create specific data type, users should use singleton objects and factory methods
+ * provided by this class.
+ */
+public class DataTypes {
+  /**
+   * Gets the StringType object.
+   */
+  public static final DataType StringType = StringType$.MODULE$;
+
+  /**
+   * Gets the BinaryType object.
+   */
+  public static final DataType BinaryType = BinaryType$.MODULE$;
+
+  /**
+   * Gets the BooleanType object.
+   */
+  public static final DataType BooleanType = BooleanType$.MODULE$;
+
+  /**
+   * Gets the DateType object.
+   */
+  public static final DataType DateType = DateType$.MODULE$;
+
+  /**
+   * Gets the TimestampType object.
+   */
+  public static final DataType TimestampType = TimestampType$.MODULE$;
+
+  /**
+   * Gets the DoubleType object.
+   */
+  public static final DataType DoubleType = DoubleType$.MODULE$;
+
+  /**
+   * Gets the FloatType object.
+   */
+  public static final DataType FloatType = FloatType$.MODULE$;
+
+  /**
+   * Gets the ByteType object.
+   */
+  public static final DataType ByteType = ByteType$.MODULE$;
+
+  /**
+   * Gets the IntegerType object.
+   */
+  public static final DataType IntegerType = IntegerType$.MODULE$;
+
+  /**
+   * Gets the LongType object.
+   */
+  public static final DataType LongType = LongType$.MODULE$;
+
+  /**
+   * Gets the ShortType object.
+   */
+  public static final DataType ShortType = ShortType$.MODULE$;
+
+  /**
+   * Gets the NullType object.
+   */
+  public static final DataType NullType = NullType$.MODULE$;
+
+  /**
+   * Creates an ArrayType by specifying the data type of elements ({@code elementType}).
+   * The field of {@code containsNull} is set to {@code true}.
+   */
+  public static ArrayType createArrayType(DataType elementType) {
+    if (elementType == null) {
+      throw new IllegalArgumentException("elementType should not be null.");
+    }
+    return new ArrayType(elementType, true);
+  }
+
+  /**
+   * Creates an ArrayType by specifying the data type of elements ({@code elementType}) and
+   * whether the array contains null values ({@code containsNull}).
+   */
+  public static ArrayType createArrayType(DataType elementType, boolean containsNull) {
+    if (elementType == null) {
+      throw new IllegalArgumentException("elementType should not be null.");
+    }
+    return new ArrayType(elementType, containsNull);
+  }
+
+  public static DecimalType createDecimalType(int precision, int scale) {
+    return DecimalType$.MODULE$.apply(precision, scale);
+  }
+
+  public static DecimalType createDecimalType() {
+    return DecimalType$.MODULE$.Unlimited();
+  }
+
+  /**
+   * Creates a MapType by specifying the data type of keys ({@code keyType}) and values
+   * ({@code keyType}). The field of {@code valueContainsNull} is set to {@code true}.
+   */
+  public static MapType createMapType(DataType keyType, DataType valueType) {
+    if (keyType == null) {
+      throw new IllegalArgumentException("keyType should not be null.");
+    }
+    if (valueType == null) {
+      throw new IllegalArgumentException("valueType should not be null.");
+    }
+    return new MapType(keyType, valueType, true);
+  }
+
+  /**
+   * Creates a MapType by specifying the data type of keys ({@code keyType}), the data type of
+   * values ({@code keyType}), and whether values contain any null value
+   * ({@code valueContainsNull}).
+   */
+  public static MapType createMapType(
+      DataType keyType,
+      DataType valueType,
+      boolean valueContainsNull) {
+    if (keyType == null) {
+      throw new IllegalArgumentException("keyType should not be null.");
+    }
+    if (valueType == null) {
+      throw new IllegalArgumentException("valueType should not be null.");
+    }
+    return new MapType(keyType, valueType, valueContainsNull);
+  }
+
+  /**
+   * Creates a StructField by specifying the name ({@code name}), data type ({@code dataType}) and
+   * whether values of this field can be null values ({@code nullable}).
+   */
+  public static StructField createStructField(
+      String name,
+      DataType dataType,
+      boolean nullable,
+      Metadata metadata) {
+    if (name == null) {
+      throw new IllegalArgumentException("name should not be null.");
+    }
+    if (dataType == null) {
+      throw new IllegalArgumentException("dataType should not be null.");
+    }
+    if (metadata == null) {
+      throw new IllegalArgumentException("metadata should not be null.");
+    }
+    return new StructField(name, dataType, nullable, metadata);
+  }
+
+  /**
+   * Creates a StructField with empty metadata.
+   *
+   * @see #createStructField(String, DataType, boolean, Metadata)
+   */
+  public static StructField createStructField(String name, DataType dataType, boolean nullable) {
+    return createStructField(name, dataType, nullable, (new MetadataBuilder()).build());
+  }
+
+  /**
+   * Creates a StructType with the given list of StructFields ({@code fields}).
+   */
+  public static StructType createStructType(List<StructField> fields) {
+    return createStructType(fields.toArray(new StructField[0]));
+  }
+
+  /**
+   * Creates a StructType with the given StructField array ({@code fields}).
+   */
+  public static StructType createStructType(StructField[] fields) {
+    if (fields == null) {
+      throw new IllegalArgumentException("fields should not be null.");
+    }
+    Set<String> distinctNames = new HashSet<String>();
+    for (StructField field : fields) {
+      if (field == null) {
+        throw new IllegalArgumentException(
+          "fields should not contain any null.");
+      }
+
+      distinctNames.add(field.name());
+    }
+    if (distinctNames.size() != fields.length) {
+      throw new IllegalArgumentException("fields should have distinct names.");
+    }
+
+    return StructType$.MODULE$.apply(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
new file mode 100755
index 0000000..e50e976
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.collection.mutable
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Metadata is a wrapper over Map[String, Any] that limits the value type to simple ones: Boolean,
+ * Long, Double, String, Metadata, Array[Boolean], Array[Long], Array[Double], Array[String], and
+ * Array[Metadata]. JSON is used for serialization.
+ *
+ * The default constructor is private. User should use either [[MetadataBuilder]] or
+ * [[Metadata.fromJson()]] to create Metadata instances.
+ *
+ * @param map an immutable map that stores the data
+ */
+@DeveloperApi
+sealed class Metadata private[types] (private[types] val map: Map[String, Any])
+  extends Serializable {
+
+  /** Tests whether this Metadata contains a binding for a key. */
+  def contains(key: String): Boolean = map.contains(key)
+
+  /** Gets a Long. */
+  def getLong(key: String): Long = get(key)
+
+  /** Gets a Double. */
+  def getDouble(key: String): Double = get(key)
+
+  /** Gets a Boolean. */
+  def getBoolean(key: String): Boolean = get(key)
+
+  /** Gets a String. */
+  def getString(key: String): String = get(key)
+
+  /** Gets a Metadata. */
+  def getMetadata(key: String): Metadata = get(key)
+
+  /** Gets a Long array. */
+  def getLongArray(key: String): Array[Long] = get(key)
+
+  /** Gets a Double array. */
+  def getDoubleArray(key: String): Array[Double] = get(key)
+
+  /** Gets a Boolean array. */
+  def getBooleanArray(key: String): Array[Boolean] = get(key)
+
+  /** Gets a String array. */
+  def getStringArray(key: String): Array[String] = get(key)
+
+  /** Gets a Metadata array. */
+  def getMetadataArray(key: String): Array[Metadata] = get(key)
+
+  /** Converts to its JSON representation. */
+  def json: String = compact(render(jsonValue))
+
+  override def toString: String = json
+
+  override def equals(obj: Any): Boolean = {
+    obj match {
+      case that: Metadata =>
+        if (map.keySet == that.map.keySet) {
+          map.keys.forall { k =>
+            (map(k), that.map(k)) match {
+              case (v0: Array[_], v1: Array[_]) =>
+                v0.view == v1.view
+              case (v0, v1) =>
+                v0 == v1
+            }
+          }
+        } else {
+          false
+        }
+      case other =>
+        false
+    }
+  }
+
+  override def hashCode: Int = Metadata.hash(this)
+
+  private def get[T](key: String): T = {
+    map(key).asInstanceOf[T]
+  }
+
+  private[sql] def jsonValue: JValue = Metadata.toJsonValue(this)
+}
+
+object Metadata {
+
+  /** Returns an empty Metadata. */
+  def empty: Metadata = new Metadata(Map.empty)
+
+  /** Creates a Metadata instance from JSON. */
+  def fromJson(json: String): Metadata = {
+    fromJObject(parse(json).asInstanceOf[JObject])
+  }
+
+  /** Creates a Metadata instance from JSON AST. */
+  private[sql] def fromJObject(jObj: JObject): Metadata = {
+    val builder = new MetadataBuilder
+    jObj.obj.foreach {
+      case (key, JInt(value)) =>
+        builder.putLong(key, value.toLong)
+      case (key, JDouble(value)) =>
+        builder.putDouble(key, value)
+      case (key, JBool(value)) =>
+        builder.putBoolean(key, value)
+      case (key, JString(value)) =>
+        builder.putString(key, value)
+      case (key, o: JObject) =>
+        builder.putMetadata(key, fromJObject(o))
+      case (key, JArray(value)) =>
+        if (value.isEmpty) {
+          // If it is an empty array, we cannot infer its element type. We put an empty Array[Long].
+          builder.putLongArray(key, Array.empty)
+        } else {
+          value.head match {
+            case _: JInt =>
+              builder.putLongArray(key, value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray)
+            case _: JDouble =>
+              builder.putDoubleArray(key, value.asInstanceOf[List[JDouble]].map(_.num).toArray)
+            case _: JBool =>
+              builder.putBooleanArray(key, value.asInstanceOf[List[JBool]].map(_.value).toArray)
+            case _: JString =>
+              builder.putStringArray(key, value.asInstanceOf[List[JString]].map(_.s).toArray)
+            case _: JObject =>
+              builder.putMetadataArray(
+                key, value.asInstanceOf[List[JObject]].map(fromJObject).toArray)
+            case other =>
+              throw new RuntimeException(s"Do not support array of type ${other.getClass}.")
+          }
+        }
+      case other =>
+        throw new RuntimeException(s"Do not support type ${other.getClass}.")
+    }
+    builder.build()
+  }
+
+  /** Converts to JSON AST. */
+  private def toJsonValue(obj: Any): JValue = {
+    obj match {
+      case map: Map[_, _] =>
+        val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) }
+        JObject(fields)
+      case arr: Array[_] =>
+        val values = arr.toList.map(toJsonValue)
+        JArray(values)
+      case x: Long =>
+        JInt(x)
+      case x: Double =>
+        JDouble(x)
+      case x: Boolean =>
+        JBool(x)
+      case x: String =>
+        JString(x)
+      case x: Metadata =>
+        toJsonValue(x.map)
+      case other =>
+        throw new RuntimeException(s"Do not support type ${other.getClass}.")
+    }
+  }
+
+  /** Computes the hash code for the types we support. */
+  private def hash(obj: Any): Int = {
+    obj match {
+      case map: Map[_, _] =>
+        map.mapValues(hash).##
+      case arr: Array[_] =>
+        // Seq.empty[T] has the same hashCode regardless of T.
+        arr.toSeq.map(hash).##
+      case x: Long =>
+        x.##
+      case x: Double =>
+        x.##
+      case x: Boolean =>
+        x.##
+      case x: String =>
+        x.##
+      case x: Metadata =>
+        hash(x.map)
+      case other =>
+        throw new RuntimeException(s"Do not support type ${other.getClass}.")
+    }
+  }
+}
+
+/**
+ * :: DeveloperApi ::
+ *
+ * Builder for [[Metadata]]. If there is a key collision, the latter will overwrite the former.
+ */
+@DeveloperApi
+class MetadataBuilder {
+
+  private val map: mutable.Map[String, Any] = mutable.Map.empty
+
+  /** Returns the immutable version of this map.  Used for java interop. */
+  protected def getMap = map.toMap
+
+  /** Include the content of an existing [[Metadata]] instance. */
+  def withMetadata(metadata: Metadata): this.type = {
+    map ++= metadata.map
+    this
+  }
+
+  /** Puts a Long. */
+  def putLong(key: String, value: Long): this.type = put(key, value)
+
+  /** Puts a Double. */
+  def putDouble(key: String, value: Double): this.type = put(key, value)
+
+  /** Puts a Boolean. */
+  def putBoolean(key: String, value: Boolean): this.type = put(key, value)
+
+  /** Puts a String. */
+  def putString(key: String, value: String): this.type = put(key, value)
+
+  /** Puts a [[Metadata]]. */
+  def putMetadata(key: String, value: Metadata): this.type = put(key, value)
+
+  /** Puts a Long array. */
+  def putLongArray(key: String, value: Array[Long]): this.type = put(key, value)
+
+  /** Puts a Double array. */
+  def putDoubleArray(key: String, value: Array[Double]): this.type = put(key, value)
+
+  /** Puts a Boolean array. */
+  def putBooleanArray(key: String, value: Array[Boolean]): this.type = put(key, value)
+
+  /** Puts a String array. */
+  def putStringArray(key: String, value: Array[String]): this.type = put(key, value)
+
+  /** Puts a [[Metadata]] array. */
+  def putMetadataArray(key: String, value: Array[Metadata]): this.type = put(key, value)
+
+  /** Builds the [[Metadata]] instance. */
+  def build(): Metadata = {
+    new Metadata(map.toMap)
+  }
+
+  private def put(key: String, value: Any): this.type = {
+    map.put(key, value)
+    this
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java
new file mode 100644
index 0000000..a64d2bb
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/SQLUserDefinedType.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types;
+
+import java.lang.annotation.*;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * ::DeveloperApi::
+ * A user-defined type which can be automatically recognized by a SQLContext and registered.
+ *
+ * WARNING: This annotation will only work if both Java and Scala reflection return the same class
+ *          names (after erasure) for the UDT.  This will NOT be the case when, e.g., the UDT class
+ *          is enclosed in an object (a singleton).
+ *
+ * WARNING: UDTs are currently only supported from Scala.
+ */
+// TODO: Should I used @Documented ?
+@DeveloperApi
+@Retention(RetentionPolicy.RUNTIME)
+@Target(ElementType.TYPE)
+public @interface SQLUserDefinedType {
+
+  /**
+   * Returns an instance of the UserDefinedType which can serialize and deserialize the user
+   * class to and from Catalyst built-in types.
+   */
+  Class<? extends UserDefinedType<?> > udt();
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
new file mode 100644
index 0000000..fa0a355
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/dataTypes.scala
@@ -0,0 +1,900 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.sql.{Date, Timestamp}
+
+import scala.math.Numeric.{FloatAsIfIntegral, DoubleAsIfIntegral}
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{TypeTag, runtimeMirror, typeTag}
+import scala.util.parsing.combinator.RegexParsers
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, Expression}
+import org.apache.spark.sql.types.decimal._
+import org.apache.spark.util.Utils
+
+
+object DataType {
+  def fromJson(json: String): DataType = parseDataType(parse(json))
+
+  private object JSortedObject {
+    def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match {
+      case JObject(seq) => Some(seq.toList.sortBy(_._1))
+      case _ => None
+    }
+  }
+
+  // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side.
+  private def parseDataType(json: JValue): DataType = json match {
+    case JString(name) =>
+      PrimitiveType.nameToType(name)
+
+    case JSortedObject(
+        ("containsNull", JBool(n)),
+        ("elementType", t: JValue),
+        ("type", JString("array"))) =>
+      ArrayType(parseDataType(t), n)
+
+    case JSortedObject(
+        ("keyType", k: JValue),
+        ("type", JString("map")),
+        ("valueContainsNull", JBool(n)),
+        ("valueType", v: JValue)) =>
+      MapType(parseDataType(k), parseDataType(v), n)
+
+    case JSortedObject(
+        ("fields", JArray(fields)),
+        ("type", JString("struct"))) =>
+      StructType(fields.map(parseStructField))
+
+    case JSortedObject(
+        ("class", JString(udtClass)),
+        ("pyClass", _),
+        ("sqlType", _),
+        ("type", JString("udt"))) =>
+      Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
+  }
+
+  private def parseStructField(json: JValue): StructField = json match {
+    case JSortedObject(
+        ("metadata", metadata: JObject),
+        ("name", JString(name)),
+        ("nullable", JBool(nullable)),
+        ("type", dataType: JValue)) =>
+      StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
+    // Support reading schema when 'metadata' is missing.
+    case JSortedObject(
+        ("name", JString(name)),
+        ("nullable", JBool(nullable)),
+        ("type", dataType: JValue)) =>
+      StructField(name, parseDataType(dataType), nullable)
+  }
+
+  @deprecated("Use DataType.fromJson instead", "1.2.0")
+  def fromCaseClassString(string: String): DataType = CaseClassStringParser(string)
+
+  private object CaseClassStringParser extends RegexParsers {
+    protected lazy val primitiveType: Parser[DataType] =
+      ( "StringType" ^^^ StringType
+      | "FloatType" ^^^ FloatType
+      | "IntegerType" ^^^ IntegerType
+      | "ByteType" ^^^ ByteType
+      | "ShortType" ^^^ ShortType
+      | "DoubleType" ^^^ DoubleType
+      | "LongType" ^^^ LongType
+      | "BinaryType" ^^^ BinaryType
+      | "BooleanType" ^^^ BooleanType
+      | "DateType" ^^^ DateType
+      | "DecimalType()" ^^^ DecimalType.Unlimited
+      | fixedDecimalType
+      | "TimestampType" ^^^ TimestampType
+      )
+
+    protected lazy val fixedDecimalType: Parser[DataType] =
+      ("DecimalType(" ~> "[0-9]+".r) ~ ("," ~> "[0-9]+".r <~ ")") ^^ {
+        case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
+      }
+
+    protected lazy val arrayType: Parser[DataType] =
+      "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+        case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+      }
+
+    protected lazy val mapType: Parser[DataType] =
+      "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+        case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
+      }
+
+    protected lazy val structField: Parser[StructField] =
+      ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+        case name ~ tpe ~ nullable  =>
+          StructField(name, tpe, nullable = nullable)
+      }
+
+    protected lazy val boolVal: Parser[Boolean] =
+      ( "true" ^^^ true
+      | "false" ^^^ false
+      )
+
+    protected lazy val structType: Parser[DataType] =
+      "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+        case fields => StructType(fields)
+      }
+
+    protected lazy val dataType: Parser[DataType] =
+      ( arrayType
+      | mapType
+      | structType
+      | primitiveType
+      )
+
+    /**
+     * Parses a string representation of a DataType.
+     *
+     * TODO: Generate parser as pickler...
+     */
+    def apply(asString: String): DataType = parseAll(dataType, asString) match {
+      case Success(result, _) => result
+      case failure: NoSuccess =>
+        throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
+    }
+
+  }
+
+  protected[types] def buildFormattedString(
+      dataType: DataType,
+      prefix: String,
+      builder: StringBuilder): Unit = {
+    dataType match {
+      case array: ArrayType =>
+        array.buildFormattedString(prefix, builder)
+      case struct: StructType =>
+        struct.buildFormattedString(prefix, builder)
+      case map: MapType =>
+        map.buildFormattedString(prefix, builder)
+      case _ =>
+    }
+  }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, StructType.
+   */
+  private[sql] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+    (left, right) match {
+      case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
+        equalsIgnoreNullability(leftElementType, rightElementType)
+      case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) =>
+        equalsIgnoreNullability(leftKeyType, rightKeyType) &&
+        equalsIgnoreNullability(leftValueType, rightValueType)
+      case (StructType(leftFields), StructType(rightFields)) =>
+        leftFields.size == rightFields.size &&
+        leftFields.zip(rightFields)
+          .forall{
+            case (left, right) =>
+              left.name == right.name && equalsIgnoreNullability(left.dataType, right.dataType)
+          }
+      case (left, right) => left == right
+    }
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The base type of all Spark SQL data types.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+abstract class DataType {
+  /** Matches any expression that evaluates to this DataType */
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType == this => true
+    case _ => false
+  }
+
+  def isPrimitive: Boolean = false
+
+  def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
+
+  private[sql] def jsonValue: JValue = typeName
+
+  def json: String = compact(render(jsonValue))
+
+  def prettyJson: String = pretty(render(jsonValue))
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `NULL` values. Please use the singleton [[DataTypes.NullType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object NullType extends DataType
+
+
+object NativeType {
+  val all = Seq(
+    IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
+
+  def unapply(dt: DataType): Boolean = all.contains(dt)
+
+  val defaultSizeOf: Map[NativeType, Int] = Map(
+    IntegerType -> 4,
+    BooleanType -> 1,
+    LongType -> 8,
+    DoubleType -> 8,
+    FloatType -> 4,
+    ShortType -> 2,
+    ByteType -> 1,
+    StringType -> 4096)
+}
+
+
+trait PrimitiveType extends DataType {
+  override def isPrimitive = true
+}
+
+
+object PrimitiveType {
+  private val nonDecimals = Seq(NullType, DateType, TimestampType, BinaryType) ++ NativeType.all
+  private val nonDecimalNameToType = nonDecimals.map(t => t.typeName -> t).toMap
+
+  /** Given the string representation of a type, return its DataType */
+  private[sql] def nameToType(name: String): DataType = {
+    val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)""".r
+    name match {
+      case "decimal" => DecimalType.Unlimited
+      case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
+      case other => nonDecimalNameToType(other)
+    }
+  }
+}
+
+abstract class NativeType extends DataType {
+  private[sql] type JvmType
+  @transient private[sql] val tag: TypeTag[JvmType]
+  private[sql] val ordering: Ordering[JvmType]
+
+  @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
+    val mirror = runtimeMirror(Utils.getSparkClassLoader)
+    ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `String` values. Please use the singleton [[DataTypes.StringType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object StringType extends NativeType with PrimitiveType {
+  private[sql] type JvmType = String
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Array[Byte]` values.
+ * Please use the singleton [[DataTypes.BinaryType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object BinaryType extends NativeType with PrimitiveType {
+  private[sql] type JvmType = Array[Byte]
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val ordering = new Ordering[JvmType] {
+    def compare(x: Array[Byte], y: Array[Byte]): Int = {
+      for (i <- 0 until x.length; if i < y.length) {
+        val res = x(i).compareTo(y(i))
+        if (res != 0) return res
+      }
+      x.length - y.length
+    }
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Boolean` values. Please use the singleton [[DataTypes.BooleanType]].
+ *
+ *@group dataType
+ */
+@DeveloperApi
+case object BooleanType extends NativeType with PrimitiveType {
+  private[sql] type JvmType = Boolean
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `java.sql.Timestamp` values.
+ * Please use the singleton [[DataTypes.TimestampType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object TimestampType extends NativeType {
+  private[sql] type JvmType = Timestamp
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+
+  private[sql] val ordering = new Ordering[JvmType] {
+    def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `java.sql.Date` values.
+ * Please use the singleton [[DataTypes.DateType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object DateType extends NativeType {
+  private[sql] type JvmType = Date
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+
+  private[sql] val ordering = new Ordering[JvmType] {
+    def compare(x: Date, y: Date) = x.compareTo(y)
+  }
+}
+
+
+abstract class NumericType extends NativeType with PrimitiveType {
+  // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+  // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+  // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+  // desugared by the compiler into an argument to the objects constructor. This means there is no
+  // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+  private[sql] val numeric: Numeric[JvmType]
+}
+
+
+object NumericType {
+  def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
+}
+
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+object IntegralType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+    case _ => false
+  }
+}
+
+
+sealed abstract class IntegralType extends NumericType {
+  private[sql] val integral: Integral[JvmType]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Long` values. Please use the singleton [[DataTypes.LongType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object LongType extends IntegralType {
+  private[sql] type JvmType = Long
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Long]]
+  private[sql] val integral = implicitly[Integral[Long]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Int` values. Please use the singleton [[DataTypes.IntegerType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object IntegerType extends IntegralType {
+  private[sql] type JvmType = Int
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Int]]
+  private[sql] val integral = implicitly[Integral[Int]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Short` values. Please use the singleton [[DataTypes.ShortType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object ShortType extends IntegralType {
+  private[sql] type JvmType = Short
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Short]]
+  private[sql] val integral = implicitly[Integral[Short]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Byte` values. Please use the singleton [[DataTypes.ByteType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object ByteType extends IntegralType {
+  private[sql] type JvmType = Byte
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Byte]]
+  private[sql] val integral = implicitly[Integral[Byte]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+}
+
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+object FractionalType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+    case _ => false
+  }
+}
+
+
+sealed abstract class FractionalType extends NumericType {
+  private[sql] val fractional: Fractional[JvmType]
+  private[sql] val asIntegral: Integral[JvmType]
+}
+
+
+/** Precision parameters for a Decimal */
+case class PrecisionInfo(precision: Int, scale: Int)
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `scala.math.BigDecimal` values.
+ * A Decimal that might have fixed precision and scale, or unlimited values for these.
+ *
+ * Please use [[DataTypes.createDecimalType()]] to create a specific instance.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
+  private[sql] type JvmType = Decimal
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = Decimal.DecimalIsFractional
+  private[sql] val fractional = Decimal.DecimalIsFractional
+  private[sql] val ordering = Decimal.DecimalIsFractional
+  private[sql] val asIntegral = Decimal.DecimalAsIfIntegral
+
+  def precision: Int = precisionInfo.map(_.precision).getOrElse(-1)
+
+  def scale: Int = precisionInfo.map(_.scale).getOrElse(-1)
+
+  override def typeName: String = precisionInfo match {
+    case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+    case None => "decimal"
+  }
+
+  override def toString: String = precisionInfo match {
+    case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
+    case None => "DecimalType()"
+  }
+}
+
+
+/** Extra factory methods and pattern matchers for Decimals */
+object DecimalType {
+  val Unlimited: DecimalType = DecimalType(None)
+
+  object Fixed {
+    def unapply(t: DecimalType): Option[(Int, Int)] =
+      t.precisionInfo.map(p => (p.precision, p.scale))
+  }
+
+  object Expression {
+    def unapply(e: Expression): Option[(Int, Int)] = e.dataType match {
+      case t: DecimalType => t.precisionInfo.map(p => (p.precision, p.scale))
+      case _ => None
+    }
+  }
+
+  def apply(): DecimalType = Unlimited
+
+  def apply(precision: Int, scale: Int): DecimalType =
+    DecimalType(Some(PrecisionInfo(precision, scale)))
+
+  def unapply(t: DataType): Boolean = t.isInstanceOf[DecimalType]
+
+  def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[DecimalType]
+
+  def isFixed(dataType: DataType): Boolean = dataType match {
+    case DecimalType.Fixed(_, _) => true
+    case _ => false
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Double` values. Please use the singleton [[DataTypes.DoubleType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object DoubleType extends FractionalType {
+  private[sql] type JvmType = Double
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Double]]
+  private[sql] val fractional = implicitly[Fractional[Double]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  private[sql] val asIntegral = DoubleAsIfIntegral
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type representing `Float` values. Please use the singleton [[DataTypes.FloatType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case object FloatType extends FractionalType {
+  private[sql] type JvmType = Float
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Float]]
+  private[sql] val fractional = implicitly[Fractional[Float]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  private[sql] val asIntegral = FloatAsIfIntegral
+}
+
+
+object ArrayType {
+  /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
+  def apply(elementType: DataType): ArrayType = ArrayType(elementType, true)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * Please use [[DataTypes.createArrayType()]] to create a specific instance.
+ *
+ * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+ * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+ * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(
+      s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
+    DataType.buildFormattedString(elementType, s"$prefix    |", builder)
+  }
+
+  override private[sql] def jsonValue =
+    ("type" -> typeName) ~
+      ("elementType" -> elementType.jsonValue) ~
+      ("containsNull" -> containsNull)
+}
+
+
+/**
+ * A field inside a StructType.
+ *
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ * @param metadata The metadata of this field. The metadata should be preserved during
+ *                 transformation if the content of the column is not modified, e.g, in selection.
+ */
+case class StructField(
+    name: String,
+    dataType: DataType,
+    nullable: Boolean = true,
+    metadata: Metadata = Metadata.empty) {
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
+    DataType.buildFormattedString(dataType, s"$prefix    |", builder)
+  }
+
+  // override the default toString to be compatible with legacy parquet files.
+  override def toString: String = s"StructField($name,$dataType,$nullable)"
+
+  private[sql] def jsonValue: JValue = {
+    ("name" -> name) ~
+      ("type" -> dataType.jsonValue) ~
+      ("nullable" -> nullable) ~
+      ("metadata" -> metadata.jsonValue)
+  }
+}
+
+
+object StructType {
+  protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+  def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
+
+  def apply(fields: java.util.List[StructField]): StructType = {
+    StructType(fields.toArray.asInstanceOf[Array[StructField]])
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * A [[StructType]] object can be constructed by
+ * {{{
+ * StructType(fields: Seq[StructField])
+ * }}}
+ * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+ * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+ * If a provided name does not have a matching field, it will be ignored. For the case
+ * of extracting a single StructField, a `null` will be returned.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val struct =
+ *   StructType(
+ *     StructField("a", IntegerType, true) ::
+ *     StructField("b", LongType, false) ::
+ *     StructField("c", BooleanType, false) :: Nil)
+ *
+ * // Extract a single StructField.
+ * val singleField = struct("b")
+ * // singleField: StructField = StructField(b,LongType,false)
+ *
+ * // This struct does not have a field called "d". null will be returned.
+ * val nonExisting = struct("d")
+ * // nonExisting: StructField = null
+ *
+ * // Extract multiple StructFields. Field names are provided in a set.
+ * // A StructType object will be returned.
+ * val twoFields = struct(Set("b", "c"))
+ * // twoFields: StructType =
+ * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ *
+ * // Any names without matching fields will be ignored.
+ * // For the case shown below, "d" will be ignored and
+ * // it is treated as struct(Set("b", "c")).
+ * val ignoreNonExisting = struct(Set("b", "c", "d"))
+ * // ignoreNonExisting: StructType =
+ * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ * }}}
+ *
+ * A [[org.apache.spark.sql.catalyst.expressions.Row]] object is used as a value of the StructType.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val innerStruct =
+ *   StructType(
+ *     StructField("f1", IntegerType, true) ::
+ *     StructField("f2", LongType, false) ::
+ *     StructField("f3", BooleanType, false) :: Nil)
+ *
+ * val struct = StructType(
+ *   StructField("a", innerStruct, true) :: Nil)
+ *
+ * // Create a Row with the schema defined by struct
+ * val row = Row(Row(1, 2, true))
+ * // row: Row = [[1,2,true]]
+ * }}}
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
+
+  /** Returns all field names in an array. */
+  def fieldNames: Array[String] = fields.map(_.name)
+
+  private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+  private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+
+  /**
+   * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+   * have a name matching the given name, `null` will be returned.
+   */
+  def apply(name: String): StructField = {
+    nameToField.getOrElse(name, throw new IllegalArgumentException(s"Field $name does not exist."))
+  }
+
+  /**
+   * Returns a [[StructType]] containing [[StructField]]s of the given names, preserving the
+   * original order of fields. Those names which do not have matching fields will be ignored.
+   */
+  def apply(names: Set[String]): StructType = {
+    val nonExistFields = names -- fieldNamesSet
+    if (nonExistFields.nonEmpty) {
+      throw new IllegalArgumentException(
+        s"Field ${nonExistFields.mkString(",")} does not exist.")
+    }
+    // Preserve the original order of fields.
+    StructType(fields.filter(f => names.contains(f.name)))
+  }
+
+  protected[sql] def toAttributes: Seq[AttributeReference] =
+    map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+  def treeString: String = {
+    val builder = new StringBuilder
+    builder.append("root\n")
+    val prefix = " |"
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+    builder.toString()
+  }
+
+  def printTreeString(): Unit = println(treeString)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+  }
+
+  override private[sql] def jsonValue =
+    ("type" -> typeName) ~
+      ("fields" -> map(_.jsonValue))
+
+  override def apply(fieldIndex: Int): StructField = fields(fieldIndex)
+
+  override def length: Int = fields.length
+
+  override def iterator: Iterator[StructField] = fields.iterator
+}
+
+
+object MapType {
+  /**
+   * Construct a [[MapType]] object with the given key type and value type.
+   * The `valueContainsNull` is true.
+   */
+  def apply(keyType: DataType, valueType: DataType): MapType =
+    MapType(keyType: DataType, valueType: DataType, valueContainsNull = true)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ *
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ *
+ * Please use [[DataTypes.createMapType()]] to create a specific instance.
+ *
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ *
+ * @group dataType
+ */
+case class MapType(
+    keyType: DataType,
+    valueType: DataType,
+    valueContainsNull: Boolean) extends DataType {
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"$prefix-- key: ${keyType.typeName}\n")
+    builder.append(s"$prefix-- value: ${valueType.typeName} " +
+      s"(valueContainsNull = $valueContainsNull)\n")
+    DataType.buildFormattedString(keyType, s"$prefix    |", builder)
+    DataType.buildFormattedString(valueType, s"$prefix    |", builder)
+  }
+
+  override private[sql] def jsonValue: JValue =
+    ("type" -> typeName) ~
+      ("keyType" -> keyType.jsonValue) ~
+      ("valueType" -> valueType.jsonValue) ~
+      ("valueContainsNull" -> valueContainsNull)
+}
+
+
+/**
+ * ::DeveloperApi::
+ * The data type for User Defined Types (UDTs).
+ *
+ * This interface allows a user to make their own classes more interoperable with SparkSQL;
+ * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
+ * a SchemaRDD which has class X in the schema.
+ *
+ * For SparkSQL to recognize UDTs, the UDT must be annotated with
+ * [[SQLUserDefinedType]].
+ *
+ * The conversion via `serialize` occurs when instantiating a `SchemaRDD` from another RDD.
+ * The conversion via `deserialize` occurs when reading from a `SchemaRDD`.
+ */
+@DeveloperApi
+abstract class UserDefinedType[UserType] extends DataType with Serializable {
+
+  /** Underlying storage type for this UDT */
+  def sqlType: DataType
+
+  /** Paired Python UDT class, if exists. */
+  def pyUDT: String = null
+
+  /**
+   * Convert the user type to a SQL datum
+   *
+   * TODO: Can we make this take obj: UserType?  The issue is in ScalaReflection.convertToCatalyst,
+   *       where we need to convert Any to UserType.
+   */
+  def serialize(obj: Any): Any
+
+  /** Convert a SQL datum to the user type */
+  def deserialize(datum: Any): UserType
+
+  override private[sql] def jsonValue: JValue = {
+    ("type" -> "udt") ~
+      ("class" -> this.getClass.getName) ~
+      ("pyClass" -> pyUDT) ~
+      ("sqlType" -> sqlType.jsonValue)
+  }
+
+  /**
+   * Class object for the UserType
+   */
+  def userClass: java.lang.Class[UserType]
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/decimal/Decimal.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/decimal/Decimal.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/decimal/Decimal.scala
new file mode 100644
index 0000000..c7864d1
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/decimal/Decimal.scala
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types.decimal
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * A mutable implementation of BigDecimal that can hold a Long if values are small enough.
+ *
+ * The semantics of the fields are as follows:
+ * - _precision and _scale represent the SQL precision and scale we are looking for
+ * - If decimalVal is set, it represents the whole decimal value
+ * - Otherwise, the decimal value is longVal / (10 ** _scale)
+ */
+final class Decimal extends Ordered[Decimal] with Serializable {
+  import Decimal.{MAX_LONG_DIGITS, POW_10, ROUNDING_MODE, BIG_DEC_ZERO}
+
+  private var decimalVal: BigDecimal = null
+  private var longVal: Long = 0L
+  private var _precision: Int = 1
+  private var _scale: Int = 0
+
+  def precision: Int = _precision
+  def scale: Int = _scale
+
+  /**
+   * Set this Decimal to the given Long. Will have precision 20 and scale 0.
+   */
+  def set(longVal: Long): Decimal = {
+    if (longVal <= -POW_10(MAX_LONG_DIGITS) || longVal >= POW_10(MAX_LONG_DIGITS)) {
+      // We can't represent this compactly as a long without risking overflow
+      this.decimalVal = BigDecimal(longVal)
+      this.longVal = 0L
+    } else {
+      this.decimalVal = null
+      this.longVal = longVal
+    }
+    this._precision = 20
+    this._scale = 0
+    this
+  }
+
+  /**
+   * Set this Decimal to the given Int. Will have precision 10 and scale 0.
+   */
+  def set(intVal: Int): Decimal = {
+    this.decimalVal = null
+    this.longVal = intVal
+    this._precision = 10
+    this._scale = 0
+    this
+  }
+
+  /**
+   * Set this Decimal to the given unscaled Long, with a given precision and scale.
+   */
+  def set(unscaled: Long, precision: Int, scale: Int): Decimal = {
+    if (setOrNull(unscaled, precision, scale) == null) {
+      throw new IllegalArgumentException("Unscaled value too large for precision")
+    }
+    this
+  }
+
+  /**
+   * Set this Decimal to the given unscaled Long, with a given precision and scale,
+   * and return it, or return null if it cannot be set due to overflow.
+   */
+  def setOrNull(unscaled: Long, precision: Int, scale: Int): Decimal = {
+    if (unscaled <= -POW_10(MAX_LONG_DIGITS) || unscaled >= POW_10(MAX_LONG_DIGITS)) {
+      // We can't represent this compactly as a long without risking overflow
+      if (precision < 19) {
+        return null  // Requested precision is too low to represent this value
+      }
+      this.decimalVal = BigDecimal(longVal)
+      this.longVal = 0L
+    } else {
+      val p = POW_10(math.min(precision, MAX_LONG_DIGITS))
+      if (unscaled <= -p || unscaled >= p) {
+        return null  // Requested precision is too low to represent this value
+      }
+      this.decimalVal = null
+      this.longVal = unscaled
+    }
+    this._precision = precision
+    this._scale = scale
+    this
+  }
+
+  /**
+   * Set this Decimal to the given BigDecimal value, with a given precision and scale.
+   */
+  def set(decimal: BigDecimal, precision: Int, scale: Int): Decimal = {
+    this.decimalVal = decimal.setScale(scale, ROUNDING_MODE)
+    require(decimalVal.precision <= precision, "Overflowed precision")
+    this.longVal = 0L
+    this._precision = precision
+    this._scale = scale
+    this
+  }
+
+  /**
+   * Set this Decimal to the given BigDecimal value, inheriting its precision and scale.
+   */
+  def set(decimal: BigDecimal): Decimal = {
+    this.decimalVal = decimal
+    this.longVal = 0L
+    this._precision = decimal.precision
+    this._scale = decimal.scale
+    this
+  }
+
+  /**
+   * Set this Decimal to the given Decimal value.
+   */
+  def set(decimal: Decimal): Decimal = {
+    this.decimalVal = decimal.decimalVal
+    this.longVal = decimal.longVal
+    this._precision = decimal._precision
+    this._scale = decimal._scale
+    this
+  }
+
+  def toBigDecimal: BigDecimal = {
+    if (decimalVal.ne(null)) {
+      decimalVal
+    } else {
+      BigDecimal(longVal, _scale)
+    }
+  }
+
+  def toUnscaledLong: Long = {
+    if (decimalVal.ne(null)) {
+      decimalVal.underlying().unscaledValue().longValue()
+    } else {
+      longVal
+    }
+  }
+
+  override def toString: String = toBigDecimal.toString()
+
+  @DeveloperApi
+  def toDebugString: String = {
+    if (decimalVal.ne(null)) {
+      s"Decimal(expanded,$decimalVal,$precision,$scale})"
+    } else {
+      s"Decimal(compact,$longVal,$precision,$scale})"
+    }
+  }
+
+  def toDouble: Double = toBigDecimal.doubleValue()
+
+  def toFloat: Float = toBigDecimal.floatValue()
+
+  def toLong: Long = {
+    if (decimalVal.eq(null)) {
+      longVal / POW_10(_scale)
+    } else {
+      decimalVal.longValue()
+    }
+  }
+
+  def toInt: Int = toLong.toInt
+
+  def toShort: Short = toLong.toShort
+
+  def toByte: Byte = toLong.toByte
+
+  /**
+   * Update precision and scale while keeping our value the same, and return true if successful.
+   *
+   * @return true if successful, false if overflow would occur
+   */
+  def changePrecision(precision: Int, scale: Int): Boolean = {
+    // First, update our longVal if we can, or transfer over to using a BigDecimal
+    if (decimalVal.eq(null)) {
+      if (scale < _scale) {
+        // Easier case: we just need to divide our scale down
+        val diff = _scale - scale
+        val droppedDigits = longVal % POW_10(diff)
+        longVal /= POW_10(diff)
+        if (math.abs(droppedDigits) * 2 >= POW_10(diff)) {
+          longVal += (if (longVal < 0) -1L else 1L)
+        }
+      } else if (scale > _scale) {
+        // We might be able to multiply longVal by a power of 10 and not overflow, but if not,
+        // switch to using a BigDecimal
+        val diff = scale - _scale
+        val p = POW_10(math.max(MAX_LONG_DIGITS - diff, 0))
+        if (diff <= MAX_LONG_DIGITS && longVal > -p && longVal < p) {
+          // Multiplying longVal by POW_10(diff) will still keep it below MAX_LONG_DIGITS
+          longVal *= POW_10(diff)
+        } else {
+          // Give up on using Longs; switch to BigDecimal, which we'll modify below
+          decimalVal = BigDecimal(longVal, _scale)
+        }
+      }
+      // In both cases, we will check whether our precision is okay below
+    }
+
+    if (decimalVal.ne(null)) {
+      // We get here if either we started with a BigDecimal, or we switched to one because we would
+      // have overflowed our Long; in either case we must rescale decimalVal to the new scale.
+      val newVal = decimalVal.setScale(scale, ROUNDING_MODE)
+      if (newVal.precision > precision) {
+        return false
+      }
+      decimalVal = newVal
+    } else {
+      // We're still using Longs, but we should check whether we match the new precision
+      val p = POW_10(math.min(_precision, MAX_LONG_DIGITS))
+      if (longVal <= -p || longVal >= p) {
+        // Note that we shouldn't have been able to fix this by switching to BigDecimal
+        return false
+      }
+    }
+
+    _precision = precision
+    _scale = scale
+    true
+  }
+
+  override def clone(): Decimal = new Decimal().set(this)
+
+  override def compare(other: Decimal): Int = {
+    if (decimalVal.eq(null) && other.decimalVal.eq(null) && _scale == other._scale) {
+      if (longVal < other.longVal) -1 else if (longVal == other.longVal) 0 else 1
+    } else {
+      toBigDecimal.compare(other.toBigDecimal)
+    }
+  }
+
+  override def equals(other: Any) = other match {
+    case d: Decimal =>
+      compare(d) == 0
+    case _ =>
+      false
+  }
+
+  override def hashCode(): Int = toBigDecimal.hashCode()
+
+  def isZero: Boolean = if (decimalVal.ne(null)) decimalVal == BIG_DEC_ZERO else longVal == 0
+
+  def + (that: Decimal): Decimal = Decimal(toBigDecimal + that.toBigDecimal)
+
+  def - (that: Decimal): Decimal = Decimal(toBigDecimal - that.toBigDecimal)
+
+  def * (that: Decimal): Decimal = Decimal(toBigDecimal * that.toBigDecimal)
+
+  def / (that: Decimal): Decimal =
+    if (that.isZero) null else Decimal(toBigDecimal / that.toBigDecimal)
+
+  def % (that: Decimal): Decimal =
+    if (that.isZero) null else Decimal(toBigDecimal % that.toBigDecimal)
+
+  def remainder(that: Decimal): Decimal = this % that
+
+  def unary_- : Decimal = {
+    if (decimalVal.ne(null)) {
+      Decimal(-decimalVal)
+    } else {
+      Decimal(-longVal, precision, scale)
+    }
+  }
+}
+
+object Decimal {
+  private val ROUNDING_MODE = BigDecimal.RoundingMode.HALF_UP
+
+  /** Maximum number of decimal digits a Long can represent */
+  val MAX_LONG_DIGITS = 18
+
+  private val POW_10 = Array.tabulate[Long](MAX_LONG_DIGITS + 1)(i => math.pow(10, i).toLong)
+
+  private val BIG_DEC_ZERO = BigDecimal(0)
+
+  def apply(value: Double): Decimal = new Decimal().set(value)
+
+  def apply(value: Long): Decimal = new Decimal().set(value)
+
+  def apply(value: Int): Decimal = new Decimal().set(value)
+
+  def apply(value: BigDecimal): Decimal = new Decimal().set(value)
+
+  def apply(value: BigDecimal, precision: Int, scale: Int): Decimal =
+    new Decimal().set(value, precision, scale)
+
+  def apply(unscaled: Long, precision: Int, scale: Int): Decimal =
+    new Decimal().set(unscaled, precision, scale)
+
+  def apply(value: String): Decimal = new Decimal().set(BigDecimal(value))
+
+  // Evidence parameters for Decimal considered either as Fractional or Integral. We provide two
+  // parameters inheriting from a common trait since both traits define mkNumericOps.
+  // See scala.math's Numeric.scala for examples for Scala's built-in types.
+
+  /** Common methods for Decimal evidence parameters */
+  trait DecimalIsConflicted extends Numeric[Decimal] {
+    override def plus(x: Decimal, y: Decimal): Decimal = x + y
+    override def times(x: Decimal, y: Decimal): Decimal = x * y
+    override def minus(x: Decimal, y: Decimal): Decimal = x - y
+    override def negate(x: Decimal): Decimal = -x
+    override def toDouble(x: Decimal): Double = x.toDouble
+    override def toFloat(x: Decimal): Float = x.toFloat
+    override def toInt(x: Decimal): Int = x.toInt
+    override def toLong(x: Decimal): Long = x.toLong
+    override def fromInt(x: Int): Decimal = new Decimal().set(x)
+    override def compare(x: Decimal, y: Decimal): Int = x.compare(y)
+  }
+
+  /** A [[scala.math.Fractional]] evidence parameter for Decimals. */
+  object DecimalIsFractional extends DecimalIsConflicted with Fractional[Decimal] {
+    override def div(x: Decimal, y: Decimal): Decimal = x / y
+  }
+
+  /** A [[scala.math.Integral]] evidence parameter for Decimals. */
+  object DecimalAsIfIntegral extends DecimalIsConflicted with Integral[Decimal] {
+    override def quot(x: Decimal, y: Decimal): Decimal = x / y
+    override def rem(x: Decimal, y: Decimal): Decimal = x % y
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
new file mode 100644
index 0000000..346a51e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/package.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+/**
+ * Contains a type system for attributes produced by relations, including complex types like
+ * structs, arrays and maps.
+ */
+package object types

http://git-wip-us.apache.org/repos/asf/spark/blob/f9969098/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index 7be24be..117725d 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -23,7 +23,7 @@ import java.sql.{Date, Timestamp}
 import org.scalatest.FunSuite
 
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.types._
 
 case class PrimitiveData(
     intField: Int,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org