You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/23 23:48:24 UTC

[2/2] spark git commit: [SQL] Break dataTypes.scala into multiple files.

[SQL] Break dataTypes.scala into multiple files.

It was over 1000 lines of code, making it harder to find all the types. Only moved code around, and didn't change any.

Author: Reynold Xin <rx...@databricks.com>

Closes #5670 from rxin/break-types and squashes the following commits:

8c59023 [Reynold Xin] Check in missing files.
dcd5193 [Reynold Xin] [SQL] Break dataTypes.scala into multiple files.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6220d933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6220d933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6220d933

Branch: refs/heads/master
Commit: 6220d933e5ce4ba890f5d6a50a69b95d319dafb4
Parents: 1ed46a6
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Apr 23 14:48:19 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 23 14:48:19 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/types/ArrayType.scala  |   74 ++
 .../org/apache/spark/sql/types/BinaryType.scala |   63 +
 .../apache/spark/sql/types/BooleanType.scala    |   51 +
 .../org/apache/spark/sql/types/ByteType.scala   |   54 +
 .../org/apache/spark/sql/types/DataType.scala   |  353 +++++
 .../org/apache/spark/sql/types/DateType.scala   |   54 +
 .../apache/spark/sql/types/DecimalType.scala    |  110 ++
 .../org/apache/spark/sql/types/DoubleType.scala |   53 +
 .../org/apache/spark/sql/types/FloatType.scala  |   53 +
 .../apache/spark/sql/types/IntegerType.scala    |   54 +
 .../org/apache/spark/sql/types/LongType.scala   |   54 +
 .../org/apache/spark/sql/types/MapType.scala    |   79 ++
 .../org/apache/spark/sql/types/NullType.scala   |   39 +
 .../org/apache/spark/sql/types/ShortType.scala  |   53 +
 .../org/apache/spark/sql/types/StringType.scala |   50 +
 .../apache/spark/sql/types/StructField.scala    |   54 +
 .../org/apache/spark/sql/types/StructType.scala |  263 ++++
 .../apache/spark/sql/types/TimestampType.scala  |   57 +
 .../spark/sql/types/UserDefinedType.scala       |   81 ++
 .../org/apache/spark/sql/types/dataTypes.scala  | 1224 ------------------
 20 files changed, 1649 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
new file mode 100644
index 0000000..b116163
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+object ArrayType {
+  /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
+  def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * Please use [[DataTypes.createArrayType()]] to create a specific instance.
+ *
+ * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+ * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+ * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null, false)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(
+      s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
+    DataType.buildFormattedString(elementType, s"$prefix    |", builder)
+  }
+
+  override private[sql] def jsonValue =
+    ("type" -> typeName) ~
+      ("elementType" -> elementType.jsonValue) ~
+      ("containsNull" -> containsNull)
+
+  /**
+   * The default size of a value of the ArrayType is 100 * the default size of the element type.
+   * (We assume that there are 100 elements).
+   */
+  override def defaultSize: Int = 100 * elementType.defaultSize
+
+  override def simpleString: String = s"array<${elementType.simpleString}>"
+
+  private[spark] override def asNullable: ArrayType =
+    ArrayType(elementType.asNullable, containsNull = true)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
new file mode 100644
index 0000000..a581a9e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Array[Byte]` values.
+ * Please use the singleton [[DataTypes.BinaryType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class BinaryType private() extends AtomicType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+
+  private[sql] type InternalType = Array[Byte]
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+  private[sql] val ordering = new Ordering[InternalType] {
+    def compare(x: Array[Byte], y: Array[Byte]): Int = {
+      for (i <- 0 until x.length; if i < y.length) {
+        val res = x(i).compareTo(y(i))
+        if (res != 0) return res
+      }
+      x.length - y.length
+    }
+  }
+
+  /**
+   * The default size of a value of the BinaryType is 4096 bytes.
+   */
+  override def defaultSize: Int = 4096
+
+  private[spark] override def asNullable: BinaryType = this
+}
+
+
+case object BinaryType extends BinaryType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
new file mode 100644
index 0000000..a7f228c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Boolean` values. Please use the singleton [[DataTypes.BooleanType]].
+ *
+ *@group dataType
+ */
+@DeveloperApi
+class BooleanType private() extends AtomicType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Boolean
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the BooleanType is 1 byte.
+   */
+  override def defaultSize: Int = 1
+
+  private[spark] override def asNullable: BooleanType = this
+}
+
+
+case object BooleanType extends BooleanType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
new file mode 100644
index 0000000..4d86857
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Byte` values. Please use the singleton [[DataTypes.ByteType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ByteType private() extends IntegralType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "ByteType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Byte
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Byte]]
+  private[sql] val integral = implicitly[Integral[Byte]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the ByteType is 1 byte.
+   */
+  override def defaultSize: Int = 1
+
+  override def simpleString: String = "tinyint"
+
+  private[spark] override def asNullable: ByteType = this
+}
+
+case object ByteType extends ByteType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
new file mode 100644
index 0000000..e6bfcd9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{TypeTag, runtimeMirror}
+import scala.util.parsing.combinator.RegexParsers
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.util.Utils
+
+
+/**
+ * :: DeveloperApi ::
+ * The base type of all Spark SQL data types.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+abstract class DataType {
+  /** Matches any expression that evaluates to this DataType */
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType == this => true
+    case _ => false
+  }
+
+  /** The default size of a value of this data type. */
+  def defaultSize: Int
+
+  def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
+
+  private[sql] def jsonValue: JValue = typeName
+
+  def json: String = compact(render(jsonValue))
+
+  def prettyJson: String = pretty(render(jsonValue))
+
+  def simpleString: String = typeName
+
+  /** Check if `this` and `other` are the same data type when ignoring nullability
+   *  (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+   */
+  private[spark] def sameType(other: DataType): Boolean =
+    DataType.equalsIgnoreNullability(this, other)
+
+  /** Returns the same data type but set all nullability fields are true
+   * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+   */
+  private[spark] def asNullable: DataType
+}
+
+
+/**
+ * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
+ */
+protected[sql] abstract class AtomicType extends DataType {
+  private[sql] type InternalType
+  @transient private[sql] val tag: TypeTag[InternalType]
+  private[sql] val ordering: Ordering[InternalType]
+
+  @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
+    val mirror = runtimeMirror(Utils.getSparkClassLoader)
+    ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
+  }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Numeric data types.
+ *
+ * @group dataType
+ */
+abstract class NumericType extends AtomicType {
+  // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+  // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+  // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+  // desugared by the compiler into an argument to the objects constructor. This means there is no
+  // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+  private[sql] val numeric: Numeric[InternalType]
+}
+
+
+private[sql] object NumericType {
+  def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
+}
+
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+private[sql] object IntegralType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+    case _ => false
+  }
+}
+
+
+private[sql] abstract class IntegralType extends NumericType {
+  private[sql] val integral: Integral[InternalType]
+}
+
+
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+private[sql] object FractionalType {
+  def unapply(a: Expression): Boolean = a match {
+    case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+    case _ => false
+  }
+}
+
+
+private[sql] abstract class FractionalType extends NumericType {
+  private[sql] val fractional: Fractional[InternalType]
+  private[sql] val asIntegral: Integral[InternalType]
+}
+
+
+object DataType {
+
+  def fromJson(json: String): DataType = parseDataType(parse(json))
+
+  @deprecated("Use DataType.fromJson instead", "1.2.0")
+  def fromCaseClassString(string: String): DataType = CaseClassStringParser(string)
+
+  private val nonDecimalNameToType = {
+    Seq(NullType, DateType, TimestampType, BinaryType,
+      IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
+      .map(t => t.typeName -> t).toMap
+  }
+
+  /** Given the string representation of a type, return its DataType */
+  private def nameToType(name: String): DataType = {
+    val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)""".r
+    name match {
+      case "decimal" => DecimalType.Unlimited
+      case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
+      case other => nonDecimalNameToType(other)
+    }
+  }
+
+  private object JSortedObject {
+    def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match {
+      case JObject(seq) => Some(seq.toList.sortBy(_._1))
+      case _ => None
+    }
+  }
+
+  // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side.
+  private def parseDataType(json: JValue): DataType = json match {
+    case JString(name) =>
+      nameToType(name)
+
+    case JSortedObject(
+    ("containsNull", JBool(n)),
+    ("elementType", t: JValue),
+    ("type", JString("array"))) =>
+      ArrayType(parseDataType(t), n)
+
+    case JSortedObject(
+    ("keyType", k: JValue),
+    ("type", JString("map")),
+    ("valueContainsNull", JBool(n)),
+    ("valueType", v: JValue)) =>
+      MapType(parseDataType(k), parseDataType(v), n)
+
+    case JSortedObject(
+    ("fields", JArray(fields)),
+    ("type", JString("struct"))) =>
+      StructType(fields.map(parseStructField))
+
+    case JSortedObject(
+    ("class", JString(udtClass)),
+    ("pyClass", _),
+    ("sqlType", _),
+    ("type", JString("udt"))) =>
+      Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
+  }
+
+  private def parseStructField(json: JValue): StructField = json match {
+    case JSortedObject(
+    ("metadata", metadata: JObject),
+    ("name", JString(name)),
+    ("nullable", JBool(nullable)),
+    ("type", dataType: JValue)) =>
+      StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
+    // Support reading schema when 'metadata' is missing.
+    case JSortedObject(
+    ("name", JString(name)),
+    ("nullable", JBool(nullable)),
+    ("type", dataType: JValue)) =>
+      StructField(name, parseDataType(dataType), nullable)
+  }
+
+  private object CaseClassStringParser extends RegexParsers {
+    protected lazy val primitiveType: Parser[DataType] =
+      ( "StringType" ^^^ StringType
+        | "FloatType" ^^^ FloatType
+        | "IntegerType" ^^^ IntegerType
+        | "ByteType" ^^^ ByteType
+        | "ShortType" ^^^ ShortType
+        | "DoubleType" ^^^ DoubleType
+        | "LongType" ^^^ LongType
+        | "BinaryType" ^^^ BinaryType
+        | "BooleanType" ^^^ BooleanType
+        | "DateType" ^^^ DateType
+        | "DecimalType()" ^^^ DecimalType.Unlimited
+        | fixedDecimalType
+        | "TimestampType" ^^^ TimestampType
+        )
+
+    protected lazy val fixedDecimalType: Parser[DataType] =
+      ("DecimalType(" ~> "[0-9]+".r) ~ ("," ~> "[0-9]+".r <~ ")") ^^ {
+        case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
+      }
+
+    protected lazy val arrayType: Parser[DataType] =
+      "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+        case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+      }
+
+    protected lazy val mapType: Parser[DataType] =
+      "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+        case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
+      }
+
+    protected lazy val structField: Parser[StructField] =
+      ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+        case name ~ tpe ~ nullable  =>
+          StructField(name, tpe, nullable = nullable)
+      }
+
+    protected lazy val boolVal: Parser[Boolean] =
+      ( "true" ^^^ true
+        | "false" ^^^ false
+        )
+
+    protected lazy val structType: Parser[DataType] =
+      "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+        case fields => StructType(fields)
+      }
+
+    protected lazy val dataType: Parser[DataType] =
+      ( arrayType
+        | mapType
+        | structType
+        | primitiveType
+        )
+
+    /**
+     * Parses a string representation of a DataType.
+     *
+     * TODO: Generate parser as pickler...
+     */
+    def apply(asString: String): DataType = parseAll(dataType, asString) match {
+      case Success(result, _) => result
+      case failure: NoSuccess =>
+        throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
+    }
+  }
+
+  protected[types] def buildFormattedString(
+    dataType: DataType,
+    prefix: String,
+    builder: StringBuilder): Unit = {
+    dataType match {
+      case array: ArrayType =>
+        array.buildFormattedString(prefix, builder)
+      case struct: StructType =>
+        struct.buildFormattedString(prefix, builder)
+      case map: MapType =>
+        map.buildFormattedString(prefix, builder)
+      case _ =>
+    }
+  }
+
+  /**
+   * Compares two types, ignoring nullability of ArrayType, MapType, StructType.
+   */
+  private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+    (left, right) match {
+      case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
+        equalsIgnoreNullability(leftElementType, rightElementType)
+      case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) =>
+        equalsIgnoreNullability(leftKeyType, rightKeyType) &&
+          equalsIgnoreNullability(leftValueType, rightValueType)
+      case (StructType(leftFields), StructType(rightFields)) =>
+        leftFields.length == rightFields.length &&
+          leftFields.zip(rightFields).forall { case (l, r) =>
+            l.name == r.name && equalsIgnoreNullability(l.dataType, r.dataType)
+          }
+      case (l, r) => l == r
+    }
+  }
+
+  /**
+   * Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
+   *
+   * Compatible nullability is defined as follows:
+   *   - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
+   *   if and only if `to.containsNull` is true, or both of `from.containsNull` and
+   *   `to.containsNull` are false.
+   *   - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
+   *   if and only if `to.valueContainsNull` is true, or both of `from.valueContainsNull` and
+   *   `to.valueContainsNull` are false.
+   *   - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
+   *   if and only if for all every pair of fields, `to.nullable` is true, or both
+   *   of `fromField.nullable` and `toField.nullable` are false.
+   */
+  private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
+    (from, to) match {
+      case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
+        (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)
+
+      case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
+        (tn || !fn) &&
+          equalsIgnoreCompatibleNullability(fromKey, toKey) &&
+          equalsIgnoreCompatibleNullability(fromValue, toValue)
+
+      case (StructType(fromFields), StructType(toFields)) =>
+        fromFields.length == toFields.length &&
+          fromFields.zip(toFields).forall { case (fromField, toField) =>
+            fromField.name == toField.name &&
+              (toField.nullable || !fromField.nullable) &&
+              equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
+          }
+
+      case (fromDataType, toDataType) => fromDataType == toDataType
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
new file mode 100644
index 0000000..03f0644
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Date` values.
+ * Please use the singleton [[DataTypes.DateType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DateType private() extends AtomicType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "DateType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Int
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the DateType is 4 bytes.
+   */
+  override def defaultSize: Int = 4
+
+  private[spark] override def asNullable: DateType = this
+}
+
+
+case object DateType extends DateType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
new file mode 100644
index 0000000..0f8cecd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+
+/** Precision parameters for a Decimal */
+case class PrecisionInfo(precision: Int, scale: Int)
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.math.BigDecimal` values.
+ * A Decimal that might have fixed precision and scale, or unlimited values for these.
+ *
+ * Please use [[DataTypes.createDecimalType()]] to create a specific instance.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
+  private[sql] type InternalType = Decimal
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = Decimal.DecimalIsFractional
+  private[sql] val fractional = Decimal.DecimalIsFractional
+  private[sql] val ordering = Decimal.DecimalIsFractional
+  private[sql] val asIntegral = Decimal.DecimalAsIfIntegral
+
+  def precision: Int = precisionInfo.map(_.precision).getOrElse(-1)
+
+  def scale: Int = precisionInfo.map(_.scale).getOrElse(-1)
+
+  override def typeName: String = precisionInfo match {
+    case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+    case None => "decimal"
+  }
+
+  override def toString: String = precisionInfo match {
+    case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
+    case None => "DecimalType()"
+  }
+
+  /**
+   * The default size of a value of the DecimalType is 4096 bytes.
+   */
+  override def defaultSize: Int = 4096
+
+  override def simpleString: String = precisionInfo match {
+    case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+    case None => "decimal(10,0)"
+  }
+
+  private[spark] override def asNullable: DecimalType = this
+}
+
+
+/** Extra factory methods and pattern matchers for Decimals */
+object DecimalType {
+  val Unlimited: DecimalType = DecimalType(None)
+
+  object Fixed {
+    def unapply(t: DecimalType): Option[(Int, Int)] =
+      t.precisionInfo.map(p => (p.precision, p.scale))
+  }
+
+  object Expression {
+    def unapply(e: Expression): Option[(Int, Int)] = e.dataType match {
+      case t: DecimalType => t.precisionInfo.map(p => (p.precision, p.scale))
+      case _ => None
+    }
+  }
+
+  def apply(): DecimalType = Unlimited
+
+  def apply(precision: Int, scale: Int): DecimalType =
+    DecimalType(Some(PrecisionInfo(precision, scale)))
+
+  def unapply(t: DataType): Boolean = t.isInstanceOf[DecimalType]
+
+  def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[DecimalType]
+
+  def isFixed(dataType: DataType): Boolean = dataType match {
+    case DecimalType.Fixed(_, _) => true
+    case _ => false
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
new file mode 100644
index 0000000..6676662
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.math.Numeric.DoubleAsIfIntegral
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Double` values. Please use the singleton [[DataTypes.DoubleType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DoubleType private() extends FractionalType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "DoubleType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Double
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Double]]
+  private[sql] val fractional = implicitly[Fractional[Double]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+  private[sql] val asIntegral = DoubleAsIfIntegral
+
+  /**
+   * The default size of a value of the DoubleType is 8 bytes.
+   */
+  override def defaultSize: Int = 8
+
+  private[spark] override def asNullable: DoubleType = this
+}
+
+case object DoubleType extends DoubleType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
new file mode 100644
index 0000000..1d5a2f4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Numeric.FloatAsIfIntegral
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Float` values. Please use the singleton [[DataTypes.FloatType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class FloatType private() extends FractionalType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "FloatType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Float
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Float]]
+  private[sql] val fractional = implicitly[Fractional[Float]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+  private[sql] val asIntegral = FloatAsIfIntegral
+
+  /**
+   * The default size of a value of the FloatType is 4 bytes.
+   */
+  override def defaultSize: Int = 4
+
+  private[spark] override def asNullable: FloatType = this
+}
+
+case object FloatType extends FloatType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
new file mode 100644
index 0000000..74e464c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Int` values. Please use the singleton [[DataTypes.IntegerType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class IntegerType private() extends IntegralType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "IntegerType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Int
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Int]]
+  private[sql] val integral = implicitly[Integral[Int]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the IntegerType is 4 bytes.
+   */
+  override def defaultSize: Int = 4
+
+  override def simpleString: String = "int"
+
+  private[spark] override def asNullable: IntegerType = this
+}
+
+case object IntegerType extends IntegerType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
new file mode 100644
index 0000000..3906757
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Long` values. Please use the singleton [[DataTypes.LongType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class LongType private() extends IntegralType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "LongType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Long
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Long]]
+  private[sql] val integral = implicitly[Integral[Long]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the LongType is 8 bytes.
+   */
+  override def defaultSize: Int = 8
+
+  override def simpleString: String = "bigint"
+
+  private[spark] override def asNullable: LongType = this
+}
+
+
+case object LongType extends LongType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
new file mode 100644
index 0000000..cfdf493
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ *
+ * Please use [[DataTypes.createMapType()]] to create a specific instance.
+ *
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ *
+ * @group dataType
+ */
+case class MapType(
+  keyType: DataType,
+  valueType: DataType,
+  valueContainsNull: Boolean) extends DataType {
+
+  /** No-arg constructor for kryo. */
+  def this() = this(null, null, false)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"$prefix-- key: ${keyType.typeName}\n")
+    builder.append(s"$prefix-- value: ${valueType.typeName} " +
+      s"(valueContainsNull = $valueContainsNull)\n")
+    DataType.buildFormattedString(keyType, s"$prefix    |", builder)
+    DataType.buildFormattedString(valueType, s"$prefix    |", builder)
+  }
+
+  override private[sql] def jsonValue: JValue =
+    ("type" -> typeName) ~
+      ("keyType" -> keyType.jsonValue) ~
+      ("valueType" -> valueType.jsonValue) ~
+      ("valueContainsNull" -> valueContainsNull)
+
+  /**
+   * The default size of a value of the MapType is
+   * 100 * (the default size of the key type + the default size of the value type).
+   * (We assume that there are 100 elements).
+   */
+  override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
+
+  override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
+
+  private[spark] override def asNullable: MapType =
+    MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
+}
+
+
+object MapType {
+  /**
+   * Construct a [[MapType]] object with the given key type and value type.
+   * The `valueContainsNull` is true.
+   */
+  def apply(keyType: DataType, valueType: DataType): MapType =
+    MapType(keyType: DataType, valueType: DataType, valueContainsNull = true)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
new file mode 100644
index 0000000..b64b074
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `NULL` values. Please use the singleton [[DataTypes.NullType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class NullType private() extends DataType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "NullType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  override def defaultSize: Int = 1
+
+  private[spark] override def asNullable: NullType = this
+}
+
+case object NullType extends NullType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
new file mode 100644
index 0000000..73e9ec7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Short` values. Please use the singleton [[DataTypes.ShortType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ShortType private() extends IntegralType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "ShortType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Short
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val numeric = implicitly[Numeric[Short]]
+  private[sql] val integral = implicitly[Integral[Short]]
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the ShortType is 2 bytes.
+   */
+  override def defaultSize: Int = 2
+
+  override def simpleString: String = "smallint"
+
+  private[spark] override def asNullable: ShortType = this
+}
+
+case object ShortType extends ShortType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
new file mode 100644
index 0000000..134ab0a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `String` values. Please use the singleton [[DataTypes.StringType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class StringType private() extends AtomicType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "StringType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = UTF8String
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+  private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+  /**
+   * The default size of a value of the StringType is 4096 bytes.
+   */
+  override def defaultSize: Int = 4096
+
+  private[spark] override def asNullable: StringType = this
+}
+
+case object StringType extends StringType
+

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
new file mode 100644
index 0000000..83570a5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+/**
+ * A field inside a StructType.
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ * @param metadata The metadata of this field. The metadata should be preserved during
+ *                 transformation if the content of the column is not modified, e.g, in selection.
+ */
+case class StructField(
+    name: String,
+    dataType: DataType,
+    nullable: Boolean = true,
+    metadata: Metadata = Metadata.empty) {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null, null)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
+    DataType.buildFormattedString(dataType, s"$prefix    |", builder)
+  }
+
+  // override the default toString to be compatible with legacy parquet files.
+  override def toString: String = s"StructField($name,$dataType,$nullable)"
+
+  private[sql] def jsonValue: JValue = {
+    ("name" -> name) ~
+      ("type" -> dataType.jsonValue) ~
+      ("nullable" -> nullable) ~
+      ("metadata" -> metadata.jsonValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
new file mode 100644
index 0000000..d80ffca
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.collection.mutable.ArrayBuffer
+import scala.math.max
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+
+
+/**
+ * :: DeveloperApi ::
+ * A [[StructType]] object can be constructed by
+ * {{{
+ * StructType(fields: Seq[StructField])
+ * }}}
+ * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+ * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+ * If a provided name does not have a matching field, it will be ignored. For the case
+ * of extracting a single StructField, a `null` will be returned.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val struct =
+ *   StructType(
+ *     StructField("a", IntegerType, true) ::
+ *     StructField("b", LongType, false) ::
+ *     StructField("c", BooleanType, false) :: Nil)
+ *
+ * // Extract a single StructField.
+ * val singleField = struct("b")
+ * // singleField: StructField = StructField(b,LongType,false)
+ *
+ * // This struct does not have a field called "d". null will be returned.
+ * val nonExisting = struct("d")
+ * // nonExisting: StructField = null
+ *
+ * // Extract multiple StructFields. Field names are provided in a set.
+ * // A StructType object will be returned.
+ * val twoFields = struct(Set("b", "c"))
+ * // twoFields: StructType =
+ * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ *
+ * // Any names without matching fields will be ignored.
+ * // For the case shown below, "d" will be ignored and
+ * // it is treated as struct(Set("b", "c")).
+ * val ignoreNonExisting = struct(Set("b", "c", "d"))
+ * // ignoreNonExisting: StructType =
+ * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ * }}}
+ *
+ * A [[org.apache.spark.sql.Row]] object is used as a value of the StructType.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val innerStruct =
+ *   StructType(
+ *     StructField("f1", IntegerType, true) ::
+ *     StructField("f2", LongType, false) ::
+ *     StructField("f3", BooleanType, false) :: Nil)
+ *
+ * val struct = StructType(
+ *   StructField("a", innerStruct, true) :: Nil)
+ *
+ * // Create a Row with the schema defined by struct
+ * val row = Row(Row(1, 2, true))
+ * // row: Row = [[1,2,true]]
+ * }}}
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
+
+  /** No-arg constructor for kryo. */
+  protected def this() = this(null)
+
+  /** Returns all field names in an array. */
+  def fieldNames: Array[String] = fields.map(_.name)
+
+  private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+  private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+  private lazy val nameToIndex: Map[String, Int] = fieldNames.zipWithIndex.toMap
+
+  /**
+   * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+   * have a name matching the given name, `null` will be returned.
+   */
+  def apply(name: String): StructField = {
+    nameToField.getOrElse(name,
+      throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+  }
+
+  /**
+   * Returns a [[StructType]] containing [[StructField]]s of the given names, preserving the
+   * original order of fields. Those names which do not have matching fields will be ignored.
+   */
+  def apply(names: Set[String]): StructType = {
+    val nonExistFields = names -- fieldNamesSet
+    if (nonExistFields.nonEmpty) {
+      throw new IllegalArgumentException(
+        s"Field ${nonExistFields.mkString(",")} does not exist.")
+    }
+    // Preserve the original order of fields.
+    StructType(fields.filter(f => names.contains(f.name)))
+  }
+
+  /**
+   * Returns index of a given field
+   */
+  def fieldIndex(name: String): Int = {
+    nameToIndex.getOrElse(name,
+      throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+  }
+
+  protected[sql] def toAttributes: Seq[AttributeReference] =
+    map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+  def treeString: String = {
+    val builder = new StringBuilder
+    builder.append("root\n")
+    val prefix = " |"
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+    builder.toString()
+  }
+
+  def printTreeString(): Unit = println(treeString)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+  }
+
+  override private[sql] def jsonValue =
+    ("type" -> typeName) ~
+      ("fields" -> map(_.jsonValue))
+
+  override def apply(fieldIndex: Int): StructField = fields(fieldIndex)
+
+  override def length: Int = fields.length
+
+  override def iterator: Iterator[StructField] = fields.iterator
+
+  /**
+   * The default size of a value of the StructType is the total default sizes of all field types.
+   */
+  override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
+
+  override def simpleString: String = {
+    val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
+    s"struct<${fieldTypes.mkString(",")}>"
+  }
+
+  /**
+   * Merges with another schema (`StructType`).  For a struct field A from `this` and a struct field
+   * B from `that`,
+   *
+   * 1. If A and B have the same name and data type, they are merged to a field C with the same name
+   *    and data type.  C is nullable if and only if either A or B is nullable.
+   * 2. If A doesn't exist in `that`, it's included in the result schema.
+   * 3. If B doesn't exist in `this`, it's also included in the result schema.
+   * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
+   *    thrown.
+   */
+  private[sql] def merge(that: StructType): StructType =
+    StructType.merge(this, that).asInstanceOf[StructType]
+
+  private[spark] override def asNullable: StructType = {
+    val newFields = fields.map {
+      case StructField(name, dataType, nullable, metadata) =>
+        StructField(name, dataType.asNullable, nullable = true, metadata)
+    }
+
+    StructType(newFields)
+  }
+}
+
+
+object StructType {
+
+  def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
+
+  def apply(fields: java.util.List[StructField]): StructType = {
+    StructType(fields.toArray.asInstanceOf[Array[StructField]])
+  }
+
+  protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+    StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+  private[sql] def merge(left: DataType, right: DataType): DataType =
+    (left, right) match {
+      case (ArrayType(leftElementType, leftContainsNull),
+      ArrayType(rightElementType, rightContainsNull)) =>
+        ArrayType(
+          merge(leftElementType, rightElementType),
+          leftContainsNull || rightContainsNull)
+
+      case (MapType(leftKeyType, leftValueType, leftContainsNull),
+      MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+        MapType(
+          merge(leftKeyType, rightKeyType),
+          merge(leftValueType, rightValueType),
+          leftContainsNull || rightContainsNull)
+
+      case (StructType(leftFields), StructType(rightFields)) =>
+        val newFields = ArrayBuffer.empty[StructField]
+
+        leftFields.foreach {
+          case leftField @ StructField(leftName, leftType, leftNullable, _) =>
+            rightFields
+              .find(_.name == leftName)
+              .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
+              leftField.copy(
+                dataType = merge(leftType, rightType),
+                nullable = leftNullable || rightNullable)
+            }
+              .orElse(Some(leftField))
+              .foreach(newFields += _)
+        }
+
+        rightFields
+          .filterNot(f => leftFields.map(_.name).contains(f.name))
+          .foreach(newFields += _)
+
+        StructType(newFields)
+
+      case (DecimalType.Fixed(leftPrecision, leftScale),
+      DecimalType.Fixed(rightPrecision, rightScale)) =>
+        DecimalType(
+          max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale),
+          max(leftScale, rightScale))
+
+      case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
+        if leftUdt.userClass == rightUdt.userClass => leftUdt
+
+      case (leftType, rightType) if leftType == rightType =>
+        leftType
+
+      case _ =>
+        throw new SparkException(s"Failed to merge incompatible data types $left and $right")
+    }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
new file mode 100644
index 0000000..aebabfc
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.sql.Timestamp
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Timestamp` values.
+ * Please use the singleton [[DataTypes.TimestampType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class TimestampType private() extends AtomicType {
+  // The companion object and this class is separated so the companion object also subclasses
+  // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
+  // Defined with a private constructor so the companion object is the only possible instantiation.
+  private[sql] type InternalType = Timestamp
+
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+  private[sql] val ordering = new Ordering[InternalType] {
+    def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
+  }
+
+  /**
+   * The default size of a value of the TimestampType is 12 bytes.
+   */
+  override def defaultSize: Int = 12
+
+  private[spark] override def asNullable: TimestampType = this
+}
+
+case object TimestampType extends TimestampType

http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
new file mode 100644
index 0000000..6b20505
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * ::DeveloperApi::
+ * The data type for User Defined Types (UDTs).
+ *
+ * This interface allows a user to make their own classes more interoperable with SparkSQL;
+ * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
+ * a `DataFrame` which has class X in the schema.
+ *
+ * For SparkSQL to recognize UDTs, the UDT must be annotated with
+ * [[SQLUserDefinedType]].
+ *
+ * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD.
+ * The conversion via `deserialize` occurs when reading from a `DataFrame`.
+ */
+@DeveloperApi
+abstract class UserDefinedType[UserType] extends DataType with Serializable {
+
+  /** Underlying storage type for this UDT */
+  def sqlType: DataType
+
+  /** Paired Python UDT class, if exists. */
+  def pyUDT: String = null
+
+  /**
+   * Convert the user type to a SQL datum
+   *
+   * TODO: Can we make this take obj: UserType?  The issue is in
+   *       CatalystTypeConverters.convertToCatalyst, where we need to convert Any to UserType.
+   */
+  def serialize(obj: Any): Any
+
+  /** Convert a SQL datum to the user type */
+  def deserialize(datum: Any): UserType
+
+  override private[sql] def jsonValue: JValue = {
+    ("type" -> "udt") ~
+      ("class" -> this.getClass.getName) ~
+      ("pyClass" -> pyUDT) ~
+      ("sqlType" -> sqlType.jsonValue)
+  }
+
+  /**
+   * Class object for the UserType
+   */
+  def userClass: java.lang.Class[UserType]
+
+  /**
+   * The default size of a value of the UserDefinedType is 4096 bytes.
+   */
+  override def defaultSize: Int = 4096
+
+  /**
+   * For UDT, asNullable will not change the nullability of its internal sqlType and just returns
+   * itself.
+   */
+  private[spark] override def asNullable: UserDefinedType[UserType] = this
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org