You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/23 23:48:24 UTC
[2/2] spark git commit: [SQL] Break dataTypes.scala into multiple
files.
[SQL] Break dataTypes.scala into multiple files.
It was over 1000 lines of code, making it harder to find all the types. Only moved code around, and didn't change any.
Author: Reynold Xin <rx...@databricks.com>
Closes #5670 from rxin/break-types and squashes the following commits:
8c59023 [Reynold Xin] Check in missing files.
dcd5193 [Reynold Xin] [SQL] Break dataTypes.scala into multiple files.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6220d933
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6220d933
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6220d933
Branch: refs/heads/master
Commit: 6220d933e5ce4ba890f5d6a50a69b95d319dafb4
Parents: 1ed46a6
Author: Reynold Xin <rx...@databricks.com>
Authored: Thu Apr 23 14:48:19 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 23 14:48:19 2015 -0700
----------------------------------------------------------------------
.../org/apache/spark/sql/types/ArrayType.scala | 74 ++
.../org/apache/spark/sql/types/BinaryType.scala | 63 +
.../apache/spark/sql/types/BooleanType.scala | 51 +
.../org/apache/spark/sql/types/ByteType.scala | 54 +
.../org/apache/spark/sql/types/DataType.scala | 353 +++++
.../org/apache/spark/sql/types/DateType.scala | 54 +
.../apache/spark/sql/types/DecimalType.scala | 110 ++
.../org/apache/spark/sql/types/DoubleType.scala | 53 +
.../org/apache/spark/sql/types/FloatType.scala | 53 +
.../apache/spark/sql/types/IntegerType.scala | 54 +
.../org/apache/spark/sql/types/LongType.scala | 54 +
.../org/apache/spark/sql/types/MapType.scala | 79 ++
.../org/apache/spark/sql/types/NullType.scala | 39 +
.../org/apache/spark/sql/types/ShortType.scala | 53 +
.../org/apache/spark/sql/types/StringType.scala | 50 +
.../apache/spark/sql/types/StructField.scala | 54 +
.../org/apache/spark/sql/types/StructType.scala | 263 ++++
.../apache/spark/sql/types/TimestampType.scala | 57 +
.../spark/sql/types/UserDefinedType.scala | 81 ++
.../org/apache/spark/sql/types/dataTypes.scala | 1224 ------------------
20 files changed, 1649 insertions(+), 1224 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
new file mode 100644
index 0000000..b116163
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ArrayType.scala
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+object ArrayType {
+ /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is true. */
+ def apply(elementType: DataType): ArrayType = ArrayType(elementType, containsNull = true)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * Please use [[DataTypes.createArrayType()]] to create a specific instance.
+ *
+ * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+ * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+ * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, false)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(
+ s"$prefix-- element: ${elementType.typeName} (containsNull = $containsNull)\n")
+ DataType.buildFormattedString(elementType, s"$prefix |", builder)
+ }
+
+ override private[sql] def jsonValue =
+ ("type" -> typeName) ~
+ ("elementType" -> elementType.jsonValue) ~
+ ("containsNull" -> containsNull)
+
+ /**
+ * The default size of a value of the ArrayType is 100 * the default size of the element type.
+ * (We assume that there are 100 elements).
+ */
+ override def defaultSize: Int = 100 * elementType.defaultSize
+
+ override def simpleString: String = s"array<${elementType.simpleString}>"
+
+ private[spark] override def asNullable: ArrayType =
+ ArrayType(elementType.asNullable, containsNull = true)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
new file mode 100644
index 0000000..a581a9e
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BinaryType.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Array[Byte]` values.
+ * Please use the singleton [[DataTypes.BinaryType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class BinaryType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "BinaryType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+
+ private[sql] type InternalType = Array[Byte]
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = new Ordering[InternalType] {
+ def compare(x: Array[Byte], y: Array[Byte]): Int = {
+ for (i <- 0 until x.length; if i < y.length) {
+ val res = x(i).compareTo(y(i))
+ if (res != 0) return res
+ }
+ x.length - y.length
+ }
+ }
+
+ /**
+ * The default size of a value of the BinaryType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ private[spark] override def asNullable: BinaryType = this
+}
+
+
+case object BinaryType extends BinaryType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
new file mode 100644
index 0000000..a7f228c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/BooleanType.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Boolean` values. Please use the singleton [[DataTypes.BooleanType]].
+ *
+ *@group dataType
+ */
+@DeveloperApi
+class BooleanType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "BooleanType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Boolean
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the BooleanType is 1 byte.
+ */
+ override def defaultSize: Int = 1
+
+ private[spark] override def asNullable: BooleanType = this
+}
+
+
+case object BooleanType extends BooleanType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
new file mode 100644
index 0000000..4d86857
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ByteType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Byte` values. Please use the singleton [[DataTypes.ByteType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ByteType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "ByteType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Byte
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Byte]]
+ private[sql] val integral = implicitly[Integral[Byte]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the ByteType is 1 byte.
+ */
+ override def defaultSize: Int = 1
+
+ override def simpleString: String = "tinyint"
+
+ private[spark] override def asNullable: ByteType = this
+}
+
+case object ByteType extends ByteType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
new file mode 100644
index 0000000..e6bfcd9
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DataType.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.ClassTag
+import scala.reflect.runtime.universe.{TypeTag, runtimeMirror}
+import scala.util.parsing.combinator.RegexParsers
+
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+import org.apache.spark.util.Utils
+
+
+/**
+ * :: DeveloperApi ::
+ * The base type of all Spark SQL data types.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+abstract class DataType {
+ /** Matches any expression that evaluates to this DataType */
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType == this => true
+ case _ => false
+ }
+
+ /** The default size of a value of this data type. */
+ def defaultSize: Int
+
+ def typeName: String = this.getClass.getSimpleName.stripSuffix("$").dropRight(4).toLowerCase
+
+ private[sql] def jsonValue: JValue = typeName
+
+ def json: String = compact(render(jsonValue))
+
+ def prettyJson: String = pretty(render(jsonValue))
+
+ def simpleString: String = typeName
+
+ /** Check if `this` and `other` are the same data type when ignoring nullability
+ * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+ */
+ private[spark] def sameType(other: DataType): Boolean =
+ DataType.equalsIgnoreNullability(this, other)
+
+ /** Returns the same data type but set all nullability fields are true
+ * (`StructField.nullable`, `ArrayType.containsNull`, and `MapType.valueContainsNull`).
+ */
+ private[spark] def asNullable: DataType
+}
+
+
+/**
+ * An internal type used to represent everything that is not null, UDTs, arrays, structs, and maps.
+ */
+protected[sql] abstract class AtomicType extends DataType {
+ private[sql] type InternalType
+ @transient private[sql] val tag: TypeTag[InternalType]
+ private[sql] val ordering: Ordering[InternalType]
+
+ @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
+ val mirror = runtimeMirror(Utils.getSparkClassLoader)
+ ClassTag[InternalType](mirror.runtimeClass(tag.tpe))
+ }
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * Numeric data types.
+ *
+ * @group dataType
+ */
+abstract class NumericType extends AtomicType {
+ // Unfortunately we can't get this implicitly as that breaks Spark Serialization. In order for
+ // implicitly[Numeric[JvmType]] to be valid, we have to change JvmType from a type variable to a
+ // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
+ // desugared by the compiler into an argument to the objects constructor. This means there is no
+ // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
+ private[sql] val numeric: Numeric[InternalType]
+}
+
+
+private[sql] object NumericType {
+ def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[NumericType]
+}
+
+
+/** Matcher for any expressions that evaluate to [[IntegralType]]s */
+private[sql] object IntegralType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[IntegralType] => true
+ case _ => false
+ }
+}
+
+
+private[sql] abstract class IntegralType extends NumericType {
+ private[sql] val integral: Integral[InternalType]
+}
+
+
+
+/** Matcher for any expressions that evaluate to [[FractionalType]]s */
+private[sql] object FractionalType {
+ def unapply(a: Expression): Boolean = a match {
+ case e: Expression if e.dataType.isInstanceOf[FractionalType] => true
+ case _ => false
+ }
+}
+
+
+private[sql] abstract class FractionalType extends NumericType {
+ private[sql] val fractional: Fractional[InternalType]
+ private[sql] val asIntegral: Integral[InternalType]
+}
+
+
+object DataType {
+
+ def fromJson(json: String): DataType = parseDataType(parse(json))
+
+ @deprecated("Use DataType.fromJson instead", "1.2.0")
+ def fromCaseClassString(string: String): DataType = CaseClassStringParser(string)
+
+ private val nonDecimalNameToType = {
+ Seq(NullType, DateType, TimestampType, BinaryType,
+ IntegerType, BooleanType, LongType, DoubleType, FloatType, ShortType, ByteType, StringType)
+ .map(t => t.typeName -> t).toMap
+ }
+
+ /** Given the string representation of a type, return its DataType */
+ private def nameToType(name: String): DataType = {
+ val FIXED_DECIMAL = """decimal\(\s*(\d+)\s*,\s*(\d+)\s*\)""".r
+ name match {
+ case "decimal" => DecimalType.Unlimited
+ case FIXED_DECIMAL(precision, scale) => DecimalType(precision.toInt, scale.toInt)
+ case other => nonDecimalNameToType(other)
+ }
+ }
+
+ private object JSortedObject {
+ def unapplySeq(value: JValue): Option[List[(String, JValue)]] = value match {
+ case JObject(seq) => Some(seq.toList.sortBy(_._1))
+ case _ => None
+ }
+ }
+
+ // NOTE: Map fields must be sorted in alphabetical order to keep consistent with the Python side.
+ private def parseDataType(json: JValue): DataType = json match {
+ case JString(name) =>
+ nameToType(name)
+
+ case JSortedObject(
+ ("containsNull", JBool(n)),
+ ("elementType", t: JValue),
+ ("type", JString("array"))) =>
+ ArrayType(parseDataType(t), n)
+
+ case JSortedObject(
+ ("keyType", k: JValue),
+ ("type", JString("map")),
+ ("valueContainsNull", JBool(n)),
+ ("valueType", v: JValue)) =>
+ MapType(parseDataType(k), parseDataType(v), n)
+
+ case JSortedObject(
+ ("fields", JArray(fields)),
+ ("type", JString("struct"))) =>
+ StructType(fields.map(parseStructField))
+
+ case JSortedObject(
+ ("class", JString(udtClass)),
+ ("pyClass", _),
+ ("sqlType", _),
+ ("type", JString("udt"))) =>
+ Class.forName(udtClass).newInstance().asInstanceOf[UserDefinedType[_]]
+ }
+
+ private def parseStructField(json: JValue): StructField = json match {
+ case JSortedObject(
+ ("metadata", metadata: JObject),
+ ("name", JString(name)),
+ ("nullable", JBool(nullable)),
+ ("type", dataType: JValue)) =>
+ StructField(name, parseDataType(dataType), nullable, Metadata.fromJObject(metadata))
+ // Support reading schema when 'metadata' is missing.
+ case JSortedObject(
+ ("name", JString(name)),
+ ("nullable", JBool(nullable)),
+ ("type", dataType: JValue)) =>
+ StructField(name, parseDataType(dataType), nullable)
+ }
+
+ private object CaseClassStringParser extends RegexParsers {
+ protected lazy val primitiveType: Parser[DataType] =
+ ( "StringType" ^^^ StringType
+ | "FloatType" ^^^ FloatType
+ | "IntegerType" ^^^ IntegerType
+ | "ByteType" ^^^ ByteType
+ | "ShortType" ^^^ ShortType
+ | "DoubleType" ^^^ DoubleType
+ | "LongType" ^^^ LongType
+ | "BinaryType" ^^^ BinaryType
+ | "BooleanType" ^^^ BooleanType
+ | "DateType" ^^^ DateType
+ | "DecimalType()" ^^^ DecimalType.Unlimited
+ | fixedDecimalType
+ | "TimestampType" ^^^ TimestampType
+ )
+
+ protected lazy val fixedDecimalType: Parser[DataType] =
+ ("DecimalType(" ~> "[0-9]+".r) ~ ("," ~> "[0-9]+".r <~ ")") ^^ {
+ case precision ~ scale => DecimalType(precision.toInt, scale.toInt)
+ }
+
+ protected lazy val arrayType: Parser[DataType] =
+ "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+ }
+
+ protected lazy val mapType: Parser[DataType] =
+ "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+ case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
+ }
+
+ protected lazy val structField: Parser[StructField] =
+ ("StructField(" ~> "[a-zA-Z0-9_]*".r) ~ ("," ~> dataType) ~ ("," ~> boolVal <~ ")") ^^ {
+ case name ~ tpe ~ nullable =>
+ StructField(name, tpe, nullable = nullable)
+ }
+
+ protected lazy val boolVal: Parser[Boolean] =
+ ( "true" ^^^ true
+ | "false" ^^^ false
+ )
+
+ protected lazy val structType: Parser[DataType] =
+ "StructType\\([A-zA-z]*\\(".r ~> repsep(structField, ",") <~ "))" ^^ {
+ case fields => StructType(fields)
+ }
+
+ protected lazy val dataType: Parser[DataType] =
+ ( arrayType
+ | mapType
+ | structType
+ | primitiveType
+ )
+
+ /**
+ * Parses a string representation of a DataType.
+ *
+ * TODO: Generate parser as pickler...
+ */
+ def apply(asString: String): DataType = parseAll(dataType, asString) match {
+ case Success(result, _) => result
+ case failure: NoSuccess =>
+ throw new IllegalArgumentException(s"Unsupported dataType: $asString, $failure")
+ }
+ }
+
+ protected[types] def buildFormattedString(
+ dataType: DataType,
+ prefix: String,
+ builder: StringBuilder): Unit = {
+ dataType match {
+ case array: ArrayType =>
+ array.buildFormattedString(prefix, builder)
+ case struct: StructType =>
+ struct.buildFormattedString(prefix, builder)
+ case map: MapType =>
+ map.buildFormattedString(prefix, builder)
+ case _ =>
+ }
+ }
+
+ /**
+ * Compares two types, ignoring nullability of ArrayType, MapType, StructType.
+ */
+ private[types] def equalsIgnoreNullability(left: DataType, right: DataType): Boolean = {
+ (left, right) match {
+ case (ArrayType(leftElementType, _), ArrayType(rightElementType, _)) =>
+ equalsIgnoreNullability(leftElementType, rightElementType)
+ case (MapType(leftKeyType, leftValueType, _), MapType(rightKeyType, rightValueType, _)) =>
+ equalsIgnoreNullability(leftKeyType, rightKeyType) &&
+ equalsIgnoreNullability(leftValueType, rightValueType)
+ case (StructType(leftFields), StructType(rightFields)) =>
+ leftFields.length == rightFields.length &&
+ leftFields.zip(rightFields).forall { case (l, r) =>
+ l.name == r.name && equalsIgnoreNullability(l.dataType, r.dataType)
+ }
+ case (l, r) => l == r
+ }
+ }
+
+ /**
+ * Compares two types, ignoring compatible nullability of ArrayType, MapType, StructType.
+ *
+ * Compatible nullability is defined as follows:
+ * - If `from` and `to` are ArrayTypes, `from` has a compatible nullability with `to`
+ * if and only if `to.containsNull` is true, or both of `from.containsNull` and
+ * `to.containsNull` are false.
+ * - If `from` and `to` are MapTypes, `from` has a compatible nullability with `to`
+ * if and only if `to.valueContainsNull` is true, or both of `from.valueContainsNull` and
+ * `to.valueContainsNull` are false.
+ * - If `from` and `to` are StructTypes, `from` has a compatible nullability with `to`
+ * if and only if for all every pair of fields, `to.nullable` is true, or both
+ * of `fromField.nullable` and `toField.nullable` are false.
+ */
+ private[sql] def equalsIgnoreCompatibleNullability(from: DataType, to: DataType): Boolean = {
+ (from, to) match {
+ case (ArrayType(fromElement, fn), ArrayType(toElement, tn)) =>
+ (tn || !fn) && equalsIgnoreCompatibleNullability(fromElement, toElement)
+
+ case (MapType(fromKey, fromValue, fn), MapType(toKey, toValue, tn)) =>
+ (tn || !fn) &&
+ equalsIgnoreCompatibleNullability(fromKey, toKey) &&
+ equalsIgnoreCompatibleNullability(fromValue, toValue)
+
+ case (StructType(fromFields), StructType(toFields)) =>
+ fromFields.length == toFields.length &&
+ fromFields.zip(toFields).forall { case (fromField, toField) =>
+ fromField.name == toField.name &&
+ (toField.nullable || !fromField.nullable) &&
+ equalsIgnoreCompatibleNullability(fromField.dataType, toField.dataType)
+ }
+
+ case (fromDataType, toDataType) => fromDataType == toDataType
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
new file mode 100644
index 0000000..03f0644
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DateType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Date` values.
+ * Please use the singleton [[DataTypes.DateType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DateType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "DateType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Int
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the DateType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ private[spark] override def asNullable: DateType = this
+}
+
+
+case object DateType extends DateType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
new file mode 100644
index 0000000..0f8cecd
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DecimalType.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+import org.apache.spark.sql.catalyst.expressions.Expression
+
+
+/** Precision parameters for a Decimal */
+case class PrecisionInfo(precision: Int, scale: Int)
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.math.BigDecimal` values.
+ * A Decimal that might have fixed precision and scale, or unlimited values for these.
+ *
+ * Please use [[DataTypes.createDecimalType()]] to create a specific instance.
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class DecimalType(precisionInfo: Option[PrecisionInfo]) extends FractionalType {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
+ private[sql] type InternalType = Decimal
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = Decimal.DecimalIsFractional
+ private[sql] val fractional = Decimal.DecimalIsFractional
+ private[sql] val ordering = Decimal.DecimalIsFractional
+ private[sql] val asIntegral = Decimal.DecimalAsIfIntegral
+
+ def precision: Int = precisionInfo.map(_.precision).getOrElse(-1)
+
+ def scale: Int = precisionInfo.map(_.scale).getOrElse(-1)
+
+ override def typeName: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+ case None => "decimal"
+ }
+
+ override def toString: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"DecimalType($precision,$scale)"
+ case None => "DecimalType()"
+ }
+
+ /**
+ * The default size of a value of the DecimalType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ override def simpleString: String = precisionInfo match {
+ case Some(PrecisionInfo(precision, scale)) => s"decimal($precision,$scale)"
+ case None => "decimal(10,0)"
+ }
+
+ private[spark] override def asNullable: DecimalType = this
+}
+
+
+/** Extra factory methods and pattern matchers for Decimals */
+object DecimalType {
+ val Unlimited: DecimalType = DecimalType(None)
+
+ object Fixed {
+ def unapply(t: DecimalType): Option[(Int, Int)] =
+ t.precisionInfo.map(p => (p.precision, p.scale))
+ }
+
+ object Expression {
+ def unapply(e: Expression): Option[(Int, Int)] = e.dataType match {
+ case t: DecimalType => t.precisionInfo.map(p => (p.precision, p.scale))
+ case _ => None
+ }
+ }
+
+ def apply(): DecimalType = Unlimited
+
+ def apply(precision: Int, scale: Int): DecimalType =
+ DecimalType(Some(PrecisionInfo(precision, scale)))
+
+ def unapply(t: DataType): Boolean = t.isInstanceOf[DecimalType]
+
+ def unapply(e: Expression): Boolean = e.dataType.isInstanceOf[DecimalType]
+
+ def isFixed(dataType: DataType): Boolean = dataType match {
+ case DecimalType.Fixed(_, _) => true
+ case _ => false
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
new file mode 100644
index 0000000..6676662
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/DoubleType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.math.Numeric.DoubleAsIfIntegral
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Double` values. Please use the singleton [[DataTypes.DoubleType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class DoubleType private() extends FractionalType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "DoubleType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Double
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Double]]
+ private[sql] val fractional = implicitly[Fractional[Double]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+ private[sql] val asIntegral = DoubleAsIfIntegral
+
+ /**
+ * The default size of a value of the DoubleType is 8 bytes.
+ */
+ override def defaultSize: Int = 8
+
+ private[spark] override def asNullable: DoubleType = this
+}
+
+case object DoubleType extends DoubleType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
new file mode 100644
index 0000000..1d5a2f4
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/FloatType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Numeric.FloatAsIfIntegral
+import scala.math.{Ordering, Fractional, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Float` values. Please use the singleton [[DataTypes.FloatType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class FloatType private() extends FractionalType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "FloatType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Float
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Float]]
+ private[sql] val fractional = implicitly[Fractional[Float]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+ private[sql] val asIntegral = FloatAsIfIntegral
+
+ /**
+ * The default size of a value of the FloatType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ private[spark] override def asNullable: FloatType = this
+}
+
+case object FloatType extends FloatType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
new file mode 100644
index 0000000..74e464c
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/IntegerType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Int` values. Please use the singleton [[DataTypes.IntegerType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class IntegerType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "IntegerType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Int
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Int]]
+ private[sql] val integral = implicitly[Integral[Int]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the IntegerType is 4 bytes.
+ */
+ override def defaultSize: Int = 4
+
+ override def simpleString: String = "int"
+
+ private[spark] override def asNullable: IntegerType = this
+}
+
+case object IntegerType extends IntegerType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
new file mode 100644
index 0000000..3906757
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/LongType.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Long` values. Please use the singleton [[DataTypes.LongType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class LongType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "LongType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Long
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Long]]
+ private[sql] val integral = implicitly[Integral[Long]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the LongType is 8 bytes.
+ */
+ override def defaultSize: Int = 8
+
+ override def simpleString: String = "bigint"
+
+ private[spark] override def asNullable: LongType = this
+}
+
+
+case object LongType extends LongType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
new file mode 100644
index 0000000..cfdf493
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/MapType.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ *
+ * Please use [[DataTypes.createMapType()]] to create a specific instance.
+ *
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ *
+ * @group dataType
+ */
+case class MapType(
+ keyType: DataType,
+ valueType: DataType,
+ valueContainsNull: Boolean) extends DataType {
+
+ /** No-arg constructor for kryo. */
+ def this() = this(null, null, false)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"$prefix-- key: ${keyType.typeName}\n")
+ builder.append(s"$prefix-- value: ${valueType.typeName} " +
+ s"(valueContainsNull = $valueContainsNull)\n")
+ DataType.buildFormattedString(keyType, s"$prefix |", builder)
+ DataType.buildFormattedString(valueType, s"$prefix |", builder)
+ }
+
+ override private[sql] def jsonValue: JValue =
+ ("type" -> typeName) ~
+ ("keyType" -> keyType.jsonValue) ~
+ ("valueType" -> valueType.jsonValue) ~
+ ("valueContainsNull" -> valueContainsNull)
+
+ /**
+ * The default size of a value of the MapType is
+ * 100 * (the default size of the key type + the default size of the value type).
+ * (We assume that there are 100 elements).
+ */
+ override def defaultSize: Int = 100 * (keyType.defaultSize + valueType.defaultSize)
+
+ override def simpleString: String = s"map<${keyType.simpleString},${valueType.simpleString}>"
+
+ private[spark] override def asNullable: MapType =
+ MapType(keyType.asNullable, valueType.asNullable, valueContainsNull = true)
+}
+
+
+object MapType {
+ /**
+ * Construct a [[MapType]] object with the given key type and value type.
+ * The `valueContainsNull` is true.
+ */
+ def apply(keyType: DataType, valueType: DataType): MapType =
+ MapType(keyType: DataType, valueType: DataType, valueContainsNull = true)
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
new file mode 100644
index 0000000..b64b074
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/NullType.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.apache.spark.annotation.DeveloperApi
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `NULL` values. Please use the singleton [[DataTypes.NullType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class NullType private() extends DataType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "NullType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ override def defaultSize: Int = 1
+
+ private[spark] override def asNullable: NullType = this
+}
+
+case object NullType extends NullType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
new file mode 100644
index 0000000..73e9ec7
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/ShortType.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.{Ordering, Integral, Numeric}
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `Short` values. Please use the singleton [[DataTypes.ShortType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class ShortType private() extends IntegralType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "ShortType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Short
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val numeric = implicitly[Numeric[Short]]
+ private[sql] val integral = implicitly[Integral[Short]]
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the ShortType is 2 bytes.
+ */
+ override def defaultSize: Int = 2
+
+ override def simpleString: String = "smallint"
+
+ private[spark] override def asNullable: ShortType = this
+}
+
+case object ShortType extends ShortType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
new file mode 100644
index 0000000..134ab0a
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StringType.scala
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `String` values. Please use the singleton [[DataTypes.StringType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class StringType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "StringType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = UTF8String
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+ private[sql] val ordering = implicitly[Ordering[InternalType]]
+
+ /**
+ * The default size of a value of the StringType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ private[spark] override def asNullable: StringType = this
+}
+
+case object StringType extends StringType
+
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
new file mode 100644
index 0000000..83570a5
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructField.scala
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+/**
+ * A field inside a StructType.
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ * @param metadata The metadata of this field. The metadata should be preserved during
+ * transformation if the content of the column is not modified, e.g, in selection.
+ */
+case class StructField(
+ name: String,
+ dataType: DataType,
+ nullable: Boolean = true,
+ metadata: Metadata = Metadata.empty) {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null, null)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ builder.append(s"$prefix-- $name: ${dataType.typeName} (nullable = $nullable)\n")
+ DataType.buildFormattedString(dataType, s"$prefix |", builder)
+ }
+
+ // override the default toString to be compatible with legacy parquet files.
+ override def toString: String = s"StructField($name,$dataType,$nullable)"
+
+ private[sql] def jsonValue: JValue = {
+ ("name" -> name) ~
+ ("type" -> dataType.jsonValue) ~
+ ("nullable" -> nullable) ~
+ ("metadata" -> metadata.jsonValue)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
new file mode 100644
index 0000000..d80ffca
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import scala.collection.mutable.ArrayBuffer
+import scala.math.max
+
+import org.json4s.JsonDSL._
+
+import org.apache.spark.SparkException
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Attribute}
+
+
+/**
+ * :: DeveloperApi ::
+ * A [[StructType]] object can be constructed by
+ * {{{
+ * StructType(fields: Seq[StructField])
+ * }}}
+ * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+ * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+ * If a provided name does not have a matching field, it will be ignored. For the case
+ * of extracting a single StructField, a `null` will be returned.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val struct =
+ * StructType(
+ * StructField("a", IntegerType, true) ::
+ * StructField("b", LongType, false) ::
+ * StructField("c", BooleanType, false) :: Nil)
+ *
+ * // Extract a single StructField.
+ * val singleField = struct("b")
+ * // singleField: StructField = StructField(b,LongType,false)
+ *
+ * // This struct does not have a field called "d". null will be returned.
+ * val nonExisting = struct("d")
+ * // nonExisting: StructField = null
+ *
+ * // Extract multiple StructFields. Field names are provided in a set.
+ * // A StructType object will be returned.
+ * val twoFields = struct(Set("b", "c"))
+ * // twoFields: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ *
+ * // Any names without matching fields will be ignored.
+ * // For the case shown below, "d" will be ignored and
+ * // it is treated as struct(Set("b", "c")).
+ * val ignoreNonExisting = struct(Set("b", "c", "d"))
+ * // ignoreNonExisting: StructType =
+ * // StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+ * }}}
+ *
+ * A [[org.apache.spark.sql.Row]] object is used as a value of the StructType.
+ * Example:
+ * {{{
+ * import org.apache.spark.sql._
+ *
+ * val innerStruct =
+ * StructType(
+ * StructField("f1", IntegerType, true) ::
+ * StructField("f2", LongType, false) ::
+ * StructField("f3", BooleanType, false) :: Nil)
+ *
+ * val struct = StructType(
+ * StructField("a", innerStruct, true) :: Nil)
+ *
+ * // Create a Row with the schema defined by struct
+ * val row = Row(Row(1, 2, true))
+ * // row: Row = [[1,2,true]]
+ * }}}
+ *
+ * @group dataType
+ */
+@DeveloperApi
+case class StructType(fields: Array[StructField]) extends DataType with Seq[StructField] {
+
+ /** No-arg constructor for kryo. */
+ protected def this() = this(null)
+
+ /** Returns all field names in an array. */
+ def fieldNames: Array[String] = fields.map(_.name)
+
+ private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+ private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+ private lazy val nameToIndex: Map[String, Int] = fieldNames.zipWithIndex.toMap
+
+ /**
+ * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+ * have a name matching the given name, `null` will be returned.
+ */
+ def apply(name: String): StructField = {
+ nameToField.getOrElse(name,
+ throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+ }
+
+ /**
+ * Returns a [[StructType]] containing [[StructField]]s of the given names, preserving the
+ * original order of fields. Those names which do not have matching fields will be ignored.
+ */
+ def apply(names: Set[String]): StructType = {
+ val nonExistFields = names -- fieldNamesSet
+ if (nonExistFields.nonEmpty) {
+ throw new IllegalArgumentException(
+ s"Field ${nonExistFields.mkString(",")} does not exist.")
+ }
+ // Preserve the original order of fields.
+ StructType(fields.filter(f => names.contains(f.name)))
+ }
+
+ /**
+ * Returns index of a given field
+ */
+ def fieldIndex(name: String): Int = {
+ nameToIndex.getOrElse(name,
+ throw new IllegalArgumentException(s"""Field "$name" does not exist."""))
+ }
+
+ protected[sql] def toAttributes: Seq[AttributeReference] =
+ map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)())
+
+ def treeString: String = {
+ val builder = new StringBuilder
+ builder.append("root\n")
+ val prefix = " |"
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+ builder.toString()
+ }
+
+ def printTreeString(): Unit = println(treeString)
+
+ private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+ fields.foreach(field => field.buildFormattedString(prefix, builder))
+ }
+
+ override private[sql] def jsonValue =
+ ("type" -> typeName) ~
+ ("fields" -> map(_.jsonValue))
+
+ override def apply(fieldIndex: Int): StructField = fields(fieldIndex)
+
+ override def length: Int = fields.length
+
+ override def iterator: Iterator[StructField] = fields.iterator
+
+ /**
+ * The default size of a value of the StructType is the total default sizes of all field types.
+ */
+ override def defaultSize: Int = fields.map(_.dataType.defaultSize).sum
+
+ override def simpleString: String = {
+ val fieldTypes = fields.map(field => s"${field.name}:${field.dataType.simpleString}")
+ s"struct<${fieldTypes.mkString(",")}>"
+ }
+
+ /**
+ * Merges with another schema (`StructType`). For a struct field A from `this` and a struct field
+ * B from `that`,
+ *
+ * 1. If A and B have the same name and data type, they are merged to a field C with the same name
+ * and data type. C is nullable if and only if either A or B is nullable.
+ * 2. If A doesn't exist in `that`, it's included in the result schema.
+ * 3. If B doesn't exist in `this`, it's also included in the result schema.
+ * 4. Otherwise, `this` and `that` are considered as conflicting schemas and an exception would be
+ * thrown.
+ */
+ private[sql] def merge(that: StructType): StructType =
+ StructType.merge(this, that).asInstanceOf[StructType]
+
+ private[spark] override def asNullable: StructType = {
+ val newFields = fields.map {
+ case StructField(name, dataType, nullable, metadata) =>
+ StructField(name, dataType.asNullable, nullable = true, metadata)
+ }
+
+ StructType(newFields)
+ }
+}
+
+
+object StructType {
+
+ def apply(fields: Seq[StructField]): StructType = StructType(fields.toArray)
+
+ def apply(fields: java.util.List[StructField]): StructType = {
+ StructType(fields.toArray.asInstanceOf[Array[StructField]])
+ }
+
+ protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
+ StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
+
+ private[sql] def merge(left: DataType, right: DataType): DataType =
+ (left, right) match {
+ case (ArrayType(leftElementType, leftContainsNull),
+ ArrayType(rightElementType, rightContainsNull)) =>
+ ArrayType(
+ merge(leftElementType, rightElementType),
+ leftContainsNull || rightContainsNull)
+
+ case (MapType(leftKeyType, leftValueType, leftContainsNull),
+ MapType(rightKeyType, rightValueType, rightContainsNull)) =>
+ MapType(
+ merge(leftKeyType, rightKeyType),
+ merge(leftValueType, rightValueType),
+ leftContainsNull || rightContainsNull)
+
+ case (StructType(leftFields), StructType(rightFields)) =>
+ val newFields = ArrayBuffer.empty[StructField]
+
+ leftFields.foreach {
+ case leftField @ StructField(leftName, leftType, leftNullable, _) =>
+ rightFields
+ .find(_.name == leftName)
+ .map { case rightField @ StructField(_, rightType, rightNullable, _) =>
+ leftField.copy(
+ dataType = merge(leftType, rightType),
+ nullable = leftNullable || rightNullable)
+ }
+ .orElse(Some(leftField))
+ .foreach(newFields += _)
+ }
+
+ rightFields
+ .filterNot(f => leftFields.map(_.name).contains(f.name))
+ .foreach(newFields += _)
+
+ StructType(newFields)
+
+ case (DecimalType.Fixed(leftPrecision, leftScale),
+ DecimalType.Fixed(rightPrecision, rightScale)) =>
+ DecimalType(
+ max(leftScale, rightScale) + max(leftPrecision - leftScale, rightPrecision - rightScale),
+ max(leftScale, rightScale))
+
+ case (leftUdt: UserDefinedType[_], rightUdt: UserDefinedType[_])
+ if leftUdt.userClass == rightUdt.userClass => leftUdt
+
+ case (leftType, rightType) if leftType == rightType =>
+ leftType
+
+ case _ =>
+ throw new SparkException(s"Failed to merge incompatible data types $left and $right")
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
new file mode 100644
index 0000000..aebabfc
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/TimestampType.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import java.sql.Timestamp
+
+import scala.math.Ordering
+import scala.reflect.runtime.universe.typeTag
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.ScalaReflectionLock
+
+
+/**
+ * :: DeveloperApi ::
+ * The data type representing `java.sql.Timestamp` values.
+ * Please use the singleton [[DataTypes.TimestampType]].
+ *
+ * @group dataType
+ */
+@DeveloperApi
+class TimestampType private() extends AtomicType {
+ // The companion object and this class is separated so the companion object also subclasses
+ // this type. Otherwise, the companion object would be of type "TimestampType$" in byte code.
+ // Defined with a private constructor so the companion object is the only possible instantiation.
+ private[sql] type InternalType = Timestamp
+
+ @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[InternalType] }
+
+ private[sql] val ordering = new Ordering[InternalType] {
+ def compare(x: Timestamp, y: Timestamp): Int = x.compareTo(y)
+ }
+
+ /**
+ * The default size of a value of the TimestampType is 12 bytes.
+ */
+ override def defaultSize: Int = 12
+
+ private[spark] override def asNullable: TimestampType = this
+}
+
+case object TimestampType extends TimestampType
http://git-wip-us.apache.org/repos/asf/spark/blob/6220d933/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
new file mode 100644
index 0000000..6b20505
--- /dev/null
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/UserDefinedType.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types
+
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * ::DeveloperApi::
+ * The data type for User Defined Types (UDTs).
+ *
+ * This interface allows a user to make their own classes more interoperable with SparkSQL;
+ * e.g., by creating a [[UserDefinedType]] for a class X, it becomes possible to create
+ * a `DataFrame` which has class X in the schema.
+ *
+ * For SparkSQL to recognize UDTs, the UDT must be annotated with
+ * [[SQLUserDefinedType]].
+ *
+ * The conversion via `serialize` occurs when instantiating a `DataFrame` from another RDD.
+ * The conversion via `deserialize` occurs when reading from a `DataFrame`.
+ */
+@DeveloperApi
+abstract class UserDefinedType[UserType] extends DataType with Serializable {
+
+ /** Underlying storage type for this UDT */
+ def sqlType: DataType
+
+ /** Paired Python UDT class, if exists. */
+ def pyUDT: String = null
+
+ /**
+ * Convert the user type to a SQL datum
+ *
+ * TODO: Can we make this take obj: UserType? The issue is in
+ * CatalystTypeConverters.convertToCatalyst, where we need to convert Any to UserType.
+ */
+ def serialize(obj: Any): Any
+
+ /** Convert a SQL datum to the user type */
+ def deserialize(datum: Any): UserType
+
+ override private[sql] def jsonValue: JValue = {
+ ("type" -> "udt") ~
+ ("class" -> this.getClass.getName) ~
+ ("pyClass" -> pyUDT) ~
+ ("sqlType" -> sqlType.jsonValue)
+ }
+
+ /**
+ * Class object for the UserType
+ */
+ def userClass: java.lang.Class[UserType]
+
+ /**
+ * The default size of a value of the UserDefinedType is 4096 bytes.
+ */
+ override def defaultSize: Int = 4096
+
+ /**
+ * For UDT, asNullable will not change the nullability of its internal sqlType and just returns
+ * itself.
+ */
+ private[spark] override def asNullable: UserDefinedType[UserType] = this
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org