You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/30 09:20:05 UTC

[3/3] git commit: [SPARK-2179][SQL] Public API for DataTypes and Schema

[SPARK-2179][SQL] Public API for DataTypes and Schema

The current PR contains the following changes:
* Expose `DataType`s in the sql package (internal details are private to sql).
* Users can create Rows.
* Introduce `applySchema` to create a `SchemaRDD` by applying a `schema: StructType` to an `RDD[Row]`.
* Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.
* `ScalaReflection.typeOfObject` provides a way to infer the Catalyst data type based on an object. Also, we can compose `typeOfObject` with some custom logics to form a new function to infer the data type (for different use cases).
* `JsonRDD` has been refactored to use changes introduced by this PR.
* Add a field `containsNull` to `ArrayType`. So, we can explicitly mark if an `ArrayType` can contain null values. The default value of `containsNull` is `false`.

New APIs are introduced in the sql package object and SQLContext. You can find the scaladoc at
[sql package object](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.package) and [SQLContext](http://yhuai.github.io/site/api/scala/index.html#org.apache.spark.sql.SQLContext).

An example of using `applySchema` is shown below.
```scala
import org.apache.spark.sql._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val schema =
  StructType(
    StructField("name", StringType, false) ::
    StructField("age", IntegerType, true) :: Nil)

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Row(p(0), p(1).trim.toInt))
val peopleSchemaRDD = sqlContext. applySchema(people, schema)
peopleSchemaRDD.printSchema
// root
// |-- name: string (nullable = false)
// |-- age: integer (nullable = true)

peopleSchemaRDD.registerAsTable("people")
sqlContext.sql("select name from people").collect.foreach(println)
```

I will add new contents to the SQL programming guide later.

JIRA: https://issues.apache.org/jira/browse/SPARK-2179

Author: Yin Huai <hu...@cse.ohio-state.edu>

Closes #1346 from yhuai/dataTypeAndSchema and squashes the following commits:

1d45977 [Yin Huai] Clean up.
a6e08b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c712fbf [Yin Huai] Converts types of values based on defined schema.
4ceeb66 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e5f8df5 [Yin Huai] Scaladoc.
122d1e7 [Yin Huai] Address comments.
03bfd95 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2476ed0 [Yin Huai] Minor updates.
ab71f21 [Yin Huai] Format.
fc2bed1 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
bd40a33 [Yin Huai] Address comments.
991f860 [Yin Huai] Move "asJavaDataType" and "asScalaDataType" to DataTypeConversions.scala.
1cb35fe [Yin Huai] Add "valueContainsNull" to MapType.
3edb3ae [Yin Huai] Python doc.
692c0b9 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
1d93395 [Yin Huai] Python APIs.
246da96 [Yin Huai] Add java data type APIs to javadoc index.
1db9531 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
d48fc7b [Yin Huai] Minor updates.
33c4fec [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b9f3071 [Yin Huai] Java API for applySchema.
1c9f33c [Yin Huai] Java APIs for DataTypes and Row.
624765c [Yin Huai] Tests for applySchema.
aa92e84 [Yin Huai] Update data type tests.
8da1a17 [Yin Huai] Add Row.fromSeq.
9c99bc0 [Yin Huai] Several minor updates.
1d9c13a [Yin Huai] Update applySchema API.
85e9b51 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
e495e4e [Yin Huai] More comments.
42d47a3 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
c3f4a02 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
2e58dbd [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
b8b7db4 [Yin Huai] 1. Move sql package object and package-info to sql-core. 2. Minor updates on APIs. 3. Update scala doc.
68525a2 [Yin Huai] Update JSON unit test.
3209108 [Yin Huai] Add unit tests.
dcaf22f [Yin Huai] Add a field containsNull to ArrayType to indicate if an array can contain null values or not. If an ArrayType is constructed by "ArrayType(elementType)" (the existing constructor), the value of containsNull is false.
9168b83 [Yin Huai] Update comments.
fc649d7 [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
eca7d04 [Yin Huai] Add two apply methods which will be used to extract StructField(s) from a StructType.
949d6bb [Yin Huai] When creating a SchemaRDD for a JSON dataset, users can apply an existing schema.
7a6a7e5 [Yin Huai] Fix bug introduced by the change made on SQLContext.inferSchema.
43a45e1 [Yin Huai] Remove sql.util.package introduced in a previous commit.
0266761 [Yin Huai] Format
03eec4c [Yin Huai] Merge remote-tracking branch 'upstream/master' into dataTypeAndSchema
90460ac [Yin Huai] Infer the Catalyst data type from an object and cast a data value to the expected type.
3fa0df5 [Yin Huai] Provide easier ways to construct a StructType.
16be3e5 [Yin Huai] This commit contains three changes: * Expose `DataType`s in the sql package (internal details are private to sql). * Introduce `createSchemaRDD` to create a `SchemaRDD` from an `RDD` with a provided schema (represented by a `StructType`) and a provided function to construct `Row`, * Add a function `simpleString` to every `DataType`. Also, the schema represented by a `StructType` can be visualized by `printSchema`.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7003c163
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7003c163
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7003c163

Branch: refs/heads/master
Commit: 7003c163dbb46bb7313aab130a33486a356435a8
Parents: 4ce92cc
Author: Yin Huai <hu...@cse.ohio-state.edu>
Authored: Wed Jul 30 00:15:31 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Wed Jul 30 00:15:31 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala |   3 +-
 project/SparkBuild.scala                        |   2 +-
 python/pyspark/sql.py                           | 567 ++++++++++++++++++-
 .../spark/sql/catalyst/ScalaReflection.scala    |  20 +
 .../catalyst/expressions/BoundAttribute.scala   |   5 +-
 .../spark/sql/catalyst/expressions/Row.scala    |  10 +
 .../sql/catalyst/expressions/WrapDynamic.scala  |  15 +-
 .../sql/catalyst/expressions/complexTypes.scala |   4 +-
 .../sql/catalyst/expressions/generators.scala   |   8 +-
 .../org/apache/spark/sql/catalyst/package.scala |   2 +
 .../sql/catalyst/planning/QueryPlanner.scala    |   2 +-
 .../spark/sql/catalyst/planning/patterns.scala  |   3 +-
 .../spark/sql/catalyst/plans/QueryPlan.scala    |  45 +-
 .../catalyst/plans/logical/basicOperators.scala |   2 +-
 .../apache/spark/sql/catalyst/rules/Rule.scala  |   2 +-
 .../spark/sql/catalyst/rules/RuleExecutor.scala |   5 +-
 .../spark/sql/catalyst/trees/package.scala      |   5 +-
 .../spark/sql/catalyst/types/dataTypes.scala    | 268 ++++++---
 .../org/apache/spark/sql/package-info.java      |  21 -
 .../scala/org/apache/spark/sql/package.scala    |  36 --
 .../sql/catalyst/ScalaReflectionSuite.scala     |  66 ++-
 .../spark/sql/api/java/types/ArrayType.java     |  68 +++
 .../spark/sql/api/java/types/BinaryType.java    |  27 +
 .../spark/sql/api/java/types/BooleanType.java   |  27 +
 .../spark/sql/api/java/types/ByteType.java      |  27 +
 .../spark/sql/api/java/types/DataType.java      | 190 +++++++
 .../spark/sql/api/java/types/DecimalType.java   |  27 +
 .../spark/sql/api/java/types/DoubleType.java    |  27 +
 .../spark/sql/api/java/types/FloatType.java     |  27 +
 .../spark/sql/api/java/types/IntegerType.java   |  27 +
 .../spark/sql/api/java/types/LongType.java      |  27 +
 .../spark/sql/api/java/types/MapType.java       |  78 +++
 .../spark/sql/api/java/types/ShortType.java     |  27 +
 .../spark/sql/api/java/types/StringType.java    |  27 +
 .../spark/sql/api/java/types/StructField.java   |  76 +++
 .../spark/sql/api/java/types/StructType.java    |  59 ++
 .../spark/sql/api/java/types/TimestampType.java |  27 +
 .../spark/sql/api/java/types/package-info.java  |  22 +
 .../scala/org/apache/spark/sql/SQLContext.scala | 230 ++++++--
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  10 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |  12 +-
 .../spark/sql/api/java/JavaSQLContext.scala     |  65 ++-
 .../spark/sql/api/java/JavaSchemaRDD.scala      |   7 +
 .../org/apache/spark/sql/api/java/Row.scala     |  59 +-
 .../org/apache/spark/sql/json/JsonRDD.scala     | 118 ++--
 .../org/apache/spark/sql/package-info.java      |  21 +
 .../scala/org/apache/spark/sql/package.scala    | 409 +++++++++++++
 .../spark/sql/parquet/ParquetConverter.scala    |   8 +-
 .../spark/sql/parquet/ParquetTableSupport.scala |   4 +-
 .../apache/spark/sql/parquet/ParquetTypes.scala |  18 +-
 .../sql/types/util/DataTypeConversions.scala    | 110 ++++
 .../sql/api/java/JavaApplySchemaSuite.java      | 166 ++++++
 .../apache/spark/sql/api/java/JavaRowSuite.java | 170 ++++++
 .../java/JavaSideDataTypeConversionSuite.java   | 150 +++++
 .../org/apache/spark/sql/DataTypeSuite.scala    |  58 ++
 .../scala/org/apache/spark/sql/RowSuite.scala   |  46 ++
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  64 ++-
 .../scala/org/apache/spark/sql/TestData.scala   |   7 +
 .../java/ScalaSideDataTypeConversionSuite.scala |  81 +++
 .../org/apache/spark/sql/json/JsonSuite.scala   | 198 ++++---
 .../org/apache/spark/sql/hive/HiveContext.scala |   9 +-
 .../apache/spark/sql/hive/HiveInspectors.scala  |   5 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   8 +-
 63 files changed, 3485 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d8453f..f551a59 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -544,7 +544,8 @@ private[spark] object PythonRDD extends Logging {
   }
 
   /**
-   * Convert an RDD of serialized Python dictionaries to Scala Maps
+   * Convert an RDD of serialized Python dictionaries to Scala Maps (no recursive conversions).
+   * It is only used by pyspark.sql.
    * TODO: Support more Python types.
    */
   def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/project/SparkBuild.scala
----------------------------------------------------------------------
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 490fac3..e2dab0f 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -312,7 +312,7 @@ object Unidoc {
         "mllib.regression", "mllib.stat", "mllib.tree", "mllib.tree.configuration",
         "mllib.tree.impurity", "mllib.tree.model", "mllib.util"
       ),
-      "-group", "Spark SQL", packageList("sql.api.java", "sql.hive.api.java"),
+      "-group", "Spark SQL", packageList("sql.api.java", "sql.api.java.types", "sql.hive.api.java"),
       "-noqualifier", "java.lang"
     )
   )

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index a6b3277..13f0ed4 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -20,7 +20,451 @@ from pyspark.serializers import BatchedSerializer, PickleSerializer
 
 from py4j.protocol import Py4JError
 
-__all__ = ["SQLContext", "HiveContext", "LocalHiveContext", "TestHiveContext", "SchemaRDD", "Row"]
+__all__ = [
+    "StringType", "BinaryType", "BooleanType", "TimestampType", "DecimalType",
+    "DoubleType", "FloatType", "ByteType", "IntegerType", "LongType",
+    "ShortType", "ArrayType", "MapType", "StructField", "StructType",
+    "SQLContext", "HiveContext", "LocalHiveContext", "TestHiveContext", "SchemaRDD", "Row"]
+
+
+class PrimitiveTypeSingleton(type):
+    _instances = {}
+
+    def __call__(cls):
+        if cls not in cls._instances:
+            cls._instances[cls] = super(PrimitiveTypeSingleton, cls).__call__()
+        return cls._instances[cls]
+
+
+class StringType(object):
+    """Spark SQL StringType
+
+    The data type representing string values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "StringType"
+
+
+class BinaryType(object):
+    """Spark SQL BinaryType
+
+    The data type representing bytearray values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "BinaryType"
+
+
+class BooleanType(object):
+    """Spark SQL BooleanType
+
+    The data type representing bool values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "BooleanType"
+
+
+class TimestampType(object):
+    """Spark SQL TimestampType
+
+    The data type representing datetime.datetime values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "TimestampType"
+
+
+class DecimalType(object):
+    """Spark SQL DecimalType
+
+    The data type representing decimal.Decimal values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "DecimalType"
+
+
+class DoubleType(object):
+    """Spark SQL DoubleType
+
+    The data type representing float values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "DoubleType"
+
+
+class FloatType(object):
+    """Spark SQL FloatType
+
+    The data type representing single precision floating-point values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "FloatType"
+
+
+class ByteType(object):
+    """Spark SQL ByteType
+
+    The data type representing int values with 1 singed byte.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "ByteType"
+
+
+class IntegerType(object):
+    """Spark SQL IntegerType
+
+    The data type representing int values.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "IntegerType"
+
+
+class LongType(object):
+    """Spark SQL LongType
+
+    The data type representing long values. If the any value is beyond the range of
+    [-9223372036854775808, 9223372036854775807], please use DecimalType.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "LongType"
+
+
+class ShortType(object):
+    """Spark SQL ShortType
+
+    The data type representing int values with 2 signed bytes.
+
+    """
+    __metaclass__ = PrimitiveTypeSingleton
+
+    def __repr__(self):
+        return "ShortType"
+
+
+class ArrayType(object):
+    """Spark SQL ArrayType
+
+    The data type representing list values.
+    An ArrayType object comprises two fields, elementType (a DataType) and containsNull (a bool).
+    The field of elementType is used to specify the type of array elements.
+    The field of containsNull is used to specify if the array has None values.
+
+    """
+    def __init__(self, elementType, containsNull=False):
+        """Creates an ArrayType
+
+        :param elementType: the data type of elements.
+        :param containsNull: indicates whether the list contains None values.
+
+        >>> ArrayType(StringType) == ArrayType(StringType, False)
+        True
+        >>> ArrayType(StringType, True) == ArrayType(StringType)
+        False
+        """
+        self.elementType = elementType
+        self.containsNull = containsNull
+
+    def __repr__(self):
+        return "ArrayType(" + self.elementType.__repr__() + "," + \
+               str(self.containsNull).lower() + ")"
+
+    def __eq__(self, other):
+        return (isinstance(other, self.__class__) and
+                self.elementType == other.elementType and
+                self.containsNull == other.containsNull)
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+
+class MapType(object):
+    """Spark SQL MapType
+
+    The data type representing dict values.
+    A MapType object comprises three fields,
+    keyType (a DataType), valueType (a DataType) and valueContainsNull (a bool).
+    The field of keyType is used to specify the type of keys in the map.
+    The field of valueType is used to specify the type of values in the map.
+    The field of valueContainsNull is used to specify if values of this map has None values.
+    For values of a MapType column, keys are not allowed to have None values.
+
+    """
+    def __init__(self, keyType, valueType, valueContainsNull=True):
+        """Creates a MapType
+        :param keyType: the data type of keys.
+        :param valueType: the data type of values.
+        :param valueContainsNull: indicates whether values contains null values.
+
+        >>> MapType(StringType, IntegerType) == MapType(StringType, IntegerType, True)
+        True
+        >>> MapType(StringType, IntegerType, False) == MapType(StringType, FloatType)
+        False
+        """
+        self.keyType = keyType
+        self.valueType = valueType
+        self.valueContainsNull = valueContainsNull
+
+    def __repr__(self):
+        return "MapType(" + self.keyType.__repr__() + "," + \
+               self.valueType.__repr__() + "," + \
+               str(self.valueContainsNull).lower() + ")"
+
+    def __eq__(self, other):
+        return (isinstance(other, self.__class__) and
+                self.keyType == other.keyType and
+                self.valueType == other.valueType and
+                self.valueContainsNull == other.valueContainsNull)
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+
+class StructField(object):
+    """Spark SQL StructField
+
+    Represents a field in a StructType.
+    A StructField object comprises three fields, name (a string), dataType (a DataType),
+    and nullable (a bool). The field of name is the name of a StructField. The field of
+    dataType specifies the data type of a StructField.
+    The field of nullable specifies if values of a StructField can contain None values.
+
+    """
+    def __init__(self, name, dataType, nullable):
+        """Creates a StructField
+        :param name: the name of this field.
+        :param dataType: the data type of this field.
+        :param nullable: indicates whether values of this field can be null.
+
+        >>> StructField("f1", StringType, True) == StructField("f1", StringType, True)
+        True
+        >>> StructField("f1", StringType, True) == StructField("f2", StringType, True)
+        False
+        """
+        self.name = name
+        self.dataType = dataType
+        self.nullable = nullable
+
+    def __repr__(self):
+        return "StructField(" + self.name + "," + \
+               self.dataType.__repr__() + "," + \
+               str(self.nullable).lower() + ")"
+
+    def __eq__(self, other):
+        return (isinstance(other, self.__class__) and
+                self.name == other.name and
+                self.dataType == other.dataType and
+                self.nullable == other.nullable)
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+
+class StructType(object):
+    """Spark SQL StructType
+
+    The data type representing namedtuple values.
+    A StructType object comprises a list of L{StructField}s.
+
+    """
+    def __init__(self, fields):
+        """Creates a StructType
+
+        >>> struct1 = StructType([StructField("f1", StringType, True)])
+        >>> struct2 = StructType([StructField("f1", StringType, True)])
+        >>> struct1 == struct2
+        True
+        >>> struct1 = StructType([StructField("f1", StringType, True)])
+        >>> struct2 = StructType([StructField("f1", StringType, True),
+        ...   [StructField("f2", IntegerType, False)]])
+        >>> struct1 == struct2
+        False
+        """
+        self.fields = fields
+
+    def __repr__(self):
+        return "StructType(List(" + \
+               ",".join([field.__repr__() for field in self.fields]) + "))"
+
+    def __eq__(self, other):
+        return (isinstance(other, self.__class__) and
+                self.fields == other.fields)
+
+    def __ne__(self, other):
+        return not self.__eq__(other)
+
+
+def _parse_datatype_list(datatype_list_string):
+    """Parses a list of comma separated data types."""
+    index = 0
+    datatype_list = []
+    start = 0
+    depth = 0
+    while index < len(datatype_list_string):
+        if depth == 0 and datatype_list_string[index] == ",":
+            datatype_string = datatype_list_string[start:index].strip()
+            datatype_list.append(_parse_datatype_string(datatype_string))
+            start = index + 1
+        elif datatype_list_string[index] == "(":
+            depth += 1
+        elif datatype_list_string[index] == ")":
+            depth -= 1
+
+        index += 1
+
+    # Handle the last data type
+    datatype_string = datatype_list_string[start:index].strip()
+    datatype_list.append(_parse_datatype_string(datatype_string))
+    return datatype_list
+
+
+def _parse_datatype_string(datatype_string):
+    """Parses the given data type string.
+
+    >>> def check_datatype(datatype):
+    ...     scala_datatype = sqlCtx._ssql_ctx.parseDataType(datatype.__repr__())
+    ...     python_datatype = _parse_datatype_string(scala_datatype.toString())
+    ...     return datatype == python_datatype
+    >>> check_datatype(StringType())
+    True
+    >>> check_datatype(BinaryType())
+    True
+    >>> check_datatype(BooleanType())
+    True
+    >>> check_datatype(TimestampType())
+    True
+    >>> check_datatype(DecimalType())
+    True
+    >>> check_datatype(DoubleType())
+    True
+    >>> check_datatype(FloatType())
+    True
+    >>> check_datatype(ByteType())
+    True
+    >>> check_datatype(IntegerType())
+    True
+    >>> check_datatype(LongType())
+    True
+    >>> check_datatype(ShortType())
+    True
+    >>> # Simple ArrayType.
+    >>> simple_arraytype = ArrayType(StringType(), True)
+    >>> check_datatype(simple_arraytype)
+    True
+    >>> # Simple MapType.
+    >>> simple_maptype = MapType(StringType(), LongType())
+    >>> check_datatype(simple_maptype)
+    True
+    >>> # Simple StructType.
+    >>> simple_structtype = StructType([
+    ...     StructField("a", DecimalType(), False),
+    ...     StructField("b", BooleanType(), True),
+    ...     StructField("c", LongType(), True),
+    ...     StructField("d", BinaryType(), False)])
+    >>> check_datatype(simple_structtype)
+    True
+    >>> # Complex StructType.
+    >>> complex_structtype = StructType([
+    ...     StructField("simpleArray", simple_arraytype, True),
+    ...     StructField("simpleMap", simple_maptype, True),
+    ...     StructField("simpleStruct", simple_structtype, True),
+    ...     StructField("boolean", BooleanType(), False)])
+    >>> check_datatype(complex_structtype)
+    True
+    >>> # Complex ArrayType.
+    >>> complex_arraytype = ArrayType(complex_structtype, True)
+    >>> check_datatype(complex_arraytype)
+    True
+    >>> # Complex MapType.
+    >>> complex_maptype = MapType(complex_structtype, complex_arraytype, False)
+    >>> check_datatype(complex_maptype)
+    True
+    """
+    left_bracket_index = datatype_string.find("(")
+    if left_bracket_index == -1:
+        # It is a primitive type.
+        left_bracket_index = len(datatype_string)
+    type_or_field = datatype_string[:left_bracket_index]
+    rest_part = datatype_string[left_bracket_index+1:len(datatype_string)-1].strip()
+    if type_or_field == "StringType":
+        return StringType()
+    elif type_or_field == "BinaryType":
+        return BinaryType()
+    elif type_or_field == "BooleanType":
+        return BooleanType()
+    elif type_or_field == "TimestampType":
+        return TimestampType()
+    elif type_or_field == "DecimalType":
+        return DecimalType()
+    elif type_or_field == "DoubleType":
+        return DoubleType()
+    elif type_or_field == "FloatType":
+        return FloatType()
+    elif type_or_field == "ByteType":
+        return ByteType()
+    elif type_or_field == "IntegerType":
+        return IntegerType()
+    elif type_or_field == "LongType":
+        return LongType()
+    elif type_or_field == "ShortType":
+        return ShortType()
+    elif type_or_field == "ArrayType":
+        last_comma_index = rest_part.rfind(",")
+        containsNull = True
+        if rest_part[last_comma_index+1:].strip().lower() == "false":
+            containsNull = False
+        elementType = _parse_datatype_string(rest_part[:last_comma_index].strip())
+        return ArrayType(elementType, containsNull)
+    elif type_or_field == "MapType":
+        last_comma_index = rest_part.rfind(",")
+        valueContainsNull = True
+        if rest_part[last_comma_index+1:].strip().lower() == "false":
+            valueContainsNull = False
+        keyType, valueType = _parse_datatype_list(rest_part[:last_comma_index].strip())
+        return MapType(keyType, valueType, valueContainsNull)
+    elif type_or_field == "StructField":
+        first_comma_index = rest_part.find(",")
+        name = rest_part[:first_comma_index].strip()
+        last_comma_index = rest_part.rfind(",")
+        nullable = True
+        if rest_part[last_comma_index+1:].strip().lower() == "false":
+            nullable = False
+        dataType = _parse_datatype_string(
+            rest_part[first_comma_index+1:last_comma_index].strip())
+        return StructField(name, dataType, nullable)
+    elif type_or_field == "StructType":
+        # rest_part should be in the format like
+        # List(StructField(field1,IntegerType,false)).
+        field_list_string = rest_part[rest_part.find("(")+1:-1]
+        fields = _parse_datatype_list(field_list_string)
+        return StructType(fields)
 
 
 class SQLContext:
@@ -109,6 +553,40 @@ class SQLContext:
         srdd = self._ssql_ctx.inferSchema(jrdd.rdd())
         return SchemaRDD(srdd, self)
 
+    def applySchema(self, rdd, schema):
+        """Applies the given schema to the given RDD of L{dict}s.
+
+        >>> schema = StructType([StructField("field1", IntegerType(), False),
+        ...     StructField("field2", StringType(), False)])
+        >>> srdd = sqlCtx.applySchema(rdd, schema)
+        >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+        >>> srdd2 = sqlCtx.sql("SELECT * from table1")
+        >>> srdd2.collect() == [{"field1" : 1, "field2" : "row1"}, {"field1" : 2, "field2": "row2"},
+        ...                    {"field1" : 3, "field2": "row3"}]
+        True
+        >>> from datetime import datetime
+        >>> rdd = sc.parallelize([{"byte": 127, "short": -32768, "float": 1.0,
+        ... "time": datetime(2010, 1, 1, 1, 1, 1), "map": {"a": 1}, "struct": {"b": 2},
+        ... "list": [1, 2, 3]}])
+        >>> schema = StructType([
+        ...     StructField("byte", ByteType(), False),
+        ...     StructField("short", ShortType(), False),
+        ...     StructField("float", FloatType(), False),
+        ...     StructField("time", TimestampType(), False),
+        ...     StructField("map", MapType(StringType(), IntegerType(), False), False),
+        ...     StructField("struct", StructType([StructField("b", ShortType(), False)]), False),
+        ...     StructField("list", ArrayType(ByteType(), False), False),
+        ...     StructField("null", DoubleType(), True)])
+        >>> srdd = sqlCtx.applySchema(rdd, schema).map(
+        ...     lambda x: (
+        ...         x.byte, x.short, x.float, x.time, x.map["a"], x.struct["b"], x.list, x.null))
+        >>> srdd.collect()[0]
+        (127, -32768, 1.0, datetime.datetime(2010, 1, 1, 1, 1, 1), 1, 2, [1, 2, 3], None)
+        """
+        jrdd = self._pythonToJavaMap(rdd._jrdd)
+        srdd = self._ssql_ctx.applySchemaToPythonRDD(jrdd.rdd(), schema.__repr__())
+        return SchemaRDD(srdd, self)
+
     def registerRDDAsTable(self, rdd, tableName):
         """Registers the given RDD as a temporary table in the catalog.
 
@@ -139,10 +617,11 @@ class SQLContext:
         jschema_rdd = self._ssql_ctx.parquetFile(path)
         return SchemaRDD(jschema_rdd, self)
 
-    def jsonFile(self, path):
-        """Loads a text file storing one JSON object per line,
-           returning the result as a L{SchemaRDD}.
-           It goes through the entire dataset once to determine the schema.
+    def jsonFile(self, path, schema=None):
+        """Loads a text file storing one JSON object per line as a L{SchemaRDD}.
+
+        If the schema is provided, applies the given schema to this JSON dataset.
+        Otherwise, it goes through the entire dataset once to determine the schema.
 
         >>> import tempfile, shutil
         >>> jsonFile = tempfile.mkdtemp()
@@ -151,8 +630,8 @@ class SQLContext:
         >>> for json in jsonStrings:
         ...   print>>ofn, json
         >>> ofn.close()
-        >>> srdd = sqlCtx.jsonFile(jsonFile)
-        >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+        >>> srdd1 = sqlCtx.jsonFile(jsonFile)
+        >>> sqlCtx.registerRDDAsTable(srdd1, "table1")
         >>> srdd2 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
         >>> srdd2.collect() == [
@@ -160,16 +639,45 @@ class SQLContext:
         ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
         ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
         True
+        >>> srdd3 = sqlCtx.jsonFile(jsonFile, srdd1.schema())
+        >>> sqlCtx.registerRDDAsTable(srdd3, "table2")
+        >>> srdd4 = sqlCtx.sql(
+        ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table2")
+        >>> srdd4.collect() == [
+        ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
+        ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
+        ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
+        True
+        >>> schema = StructType([
+        ...     StructField("field2", StringType(), True),
+        ...     StructField("field3",
+        ...         StructType([
+        ...             StructField("field5", ArrayType(IntegerType(), False), True)]), False)])
+        >>> srdd5 = sqlCtx.jsonFile(jsonFile, schema)
+        >>> sqlCtx.registerRDDAsTable(srdd5, "table3")
+        >>> srdd6 = sqlCtx.sql(
+        ...   "SELECT field2 AS f1, field3.field5 as f2, field3.field5[0] as f3 from table3")
+        >>> srdd6.collect() == [
+        ... {"f1": "row1", "f2": None, "f3": None},
+        ... {"f1": None, "f2": [10, 11], "f3": 10},
+        ... {"f1": "row3", "f2": [], "f3": None}]
+        True
         """
-        jschema_rdd = self._ssql_ctx.jsonFile(path)
+        if schema is None:
+            jschema_rdd = self._ssql_ctx.jsonFile(path)
+        else:
+            scala_datatype = self._ssql_ctx.parseDataType(schema.__repr__())
+            jschema_rdd = self._ssql_ctx.jsonFile(path, scala_datatype)
         return SchemaRDD(jschema_rdd, self)
 
-    def jsonRDD(self, rdd):
-        """Loads an RDD storing one JSON object per string, returning the result as a L{SchemaRDD}.
-           It goes through the entire dataset once to determine the schema.
+    def jsonRDD(self, rdd, schema=None):
+        """Loads an RDD storing one JSON object per string as a L{SchemaRDD}.
 
-        >>> srdd = sqlCtx.jsonRDD(json)
-        >>> sqlCtx.registerRDDAsTable(srdd, "table1")
+        If the schema is provided, applies the given schema to this JSON dataset.
+        Otherwise, it goes through the entire dataset once to determine the schema.
+
+        >>> srdd1 = sqlCtx.jsonRDD(json)
+        >>> sqlCtx.registerRDDAsTable(srdd1, "table1")
         >>> srdd2 = sqlCtx.sql(
         ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table1")
         >>> srdd2.collect() == [
@@ -177,6 +685,29 @@ class SQLContext:
         ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
         ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
         True
+        >>> srdd3 = sqlCtx.jsonRDD(json, srdd1.schema())
+        >>> sqlCtx.registerRDDAsTable(srdd3, "table2")
+        >>> srdd4 = sqlCtx.sql(
+        ...   "SELECT field1 AS f1, field2 as f2, field3 as f3, field6 as f4 from table2")
+        >>> srdd4.collect() == [
+        ... {"f1":1, "f2":"row1", "f3":{"field4":11, "field5": None}, "f4":None},
+        ... {"f1":2, "f2":None, "f3":{"field4":22,  "field5": [10, 11]}, "f4":[{"field7": "row2"}]},
+        ... {"f1":None, "f2":"row3", "f3":{"field4":33, "field5": []}, "f4":None}]
+        True
+        >>> schema = StructType([
+        ...     StructField("field2", StringType(), True),
+        ...     StructField("field3",
+        ...         StructType([
+        ...             StructField("field5", ArrayType(IntegerType(), False), True)]), False)])
+        >>> srdd5 = sqlCtx.jsonRDD(json, schema)
+        >>> sqlCtx.registerRDDAsTable(srdd5, "table3")
+        >>> srdd6 = sqlCtx.sql(
+        ...   "SELECT field2 AS f1, field3.field5 as f2, field3.field5[0] as f3 from table3")
+        >>> srdd6.collect() == [
+        ... {"f1": "row1", "f2": None, "f3": None},
+        ... {"f1": None, "f2": [10, 11], "f3": 10},
+        ... {"f1": "row3", "f2": [], "f3": None}]
+        True
         """
         def func(split, iterator):
             for x in iterator:
@@ -186,7 +717,11 @@ class SQLContext:
         keyed = PipelinedRDD(rdd, func)
         keyed._bypass_serializer = True
         jrdd = keyed._jrdd.map(self._jvm.BytesToString())
-        jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
+        if schema is None:
+            jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd())
+        else:
+            scala_datatype = self._ssql_ctx.parseDataType(schema.__repr__())
+            jschema_rdd = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
         return SchemaRDD(jschema_rdd, self)
 
     def sql(self, sqlQuery):
@@ -389,6 +924,10 @@ class SchemaRDD(RDD):
         """Creates a new table with the contents of this SchemaRDD."""
         self._jschema_rdd.saveAsTable(tableName)
 
+    def schema(self):
+        """Returns the schema of this SchemaRDD (represented by a L{StructType})."""
+        return _parse_datatype_string(self._jschema_rdd.schema().toString())
+
     def schemaString(self):
         """Returns the output schema in the tree format."""
         return self._jschema_rdd.schemaString()

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
index 5a55be1..0d26b52 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/ScalaReflection.scala
@@ -85,6 +85,26 @@ object ScalaReflection {
     case t if t <:< definitions.BooleanTpe => Schema(BooleanType, nullable = false)
   }
 
+  def typeOfObject: PartialFunction[Any, DataType] = {
+    // The data type can be determined without ambiguity.
+    case obj: BooleanType.JvmType => BooleanType
+    case obj: BinaryType.JvmType => BinaryType
+    case obj: StringType.JvmType => StringType
+    case obj: ByteType.JvmType => ByteType
+    case obj: ShortType.JvmType => ShortType
+    case obj: IntegerType.JvmType => IntegerType
+    case obj: LongType.JvmType => LongType
+    case obj: FloatType.JvmType => FloatType
+    case obj: DoubleType.JvmType => DoubleType
+    case obj: DecimalType.JvmType => DecimalType
+    case obj: TimestampType.JvmType => TimestampType
+    case null => NullType
+    // For other cases, there is no obvious mapping from the type of the given object to a
+    // Catalyst data type. A user should provide his/her specific rules
+    // (in a user-defined PartialFunction) to infer the Catalyst data type for other types of
+    // objects and then compose the user-defined PartialFunction with this one.
+  }
+
   implicit class CaseClassRelation[A <: Product : TypeTag](data: Seq[A]) {
 
     /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
index a3ebec8..f38f995 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
@@ -17,14 +17,11 @@
 
 package org.apache.spark.sql.catalyst.expressions
 
+import org.apache.spark.sql.catalyst.Logging
 import org.apache.spark.sql.catalyst.errors.attachTree
-import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.trees
 
-import org.apache.spark.sql.Logging
-
 /**
  * A bound reference points to a specific slot in the input tuple, allowing the actual value
  * to be retrieved more efficiently.  However, since operations like column pruning can change

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
index 7470cb8..c9a63e2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Row.scala
@@ -32,6 +32,16 @@ object Row {
    * }}}
    */
   def unapplySeq(row: Row): Some[Seq[Any]] = Some(row)
+
+  /**
+   * This method can be used to construct a [[Row]] with the given values.
+   */
+  def apply(values: Any*): Row = new GenericRow(values.toArray)
+
+  /**
+   * This method can be used to construct a [[Row]] from a [[Seq]] of values.
+   */
+  def fromSeq(values: Seq[Any]): Row = new GenericRow(values.toArray)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
index e787c59..eb88989 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/WrapDynamic.scala
@@ -21,8 +21,16 @@ import scala.language.dynamics
 
 import org.apache.spark.sql.catalyst.types.DataType
 
-case object DynamicType extends DataType
+/**
+ * The data type representing [[DynamicRow]] values.
+ */
+case object DynamicType extends DataType {
+  def simpleString: String = "dynamic"
+}
 
+/**
+ * Wrap a [[Row]] as a [[DynamicRow]].
+ */
 case class WrapDynamic(children: Seq[Attribute]) extends Expression {
   type EvaluatedType = DynamicRow
 
@@ -37,6 +45,11 @@ case class WrapDynamic(children: Seq[Attribute]) extends Expression {
   }
 }
 
+/**
+ * DynamicRows use scala's Dynamic trait to emulate an ORM of in a dynamically typed language.
+ * Since the type of the column is not known at compile time, all attributes are converted to
+ * strings before being passed to the function.
+ */
 class DynamicRow(val schema: Seq[Attribute], values: Array[Any])
   extends GenericRow(values) with Dynamic {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
index 0acb290..72add5e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypes.scala
@@ -31,8 +31,8 @@ case class GetItem(child: Expression, ordinal: Expression) extends Expression {
   override def foldable = child.foldable && ordinal.foldable
   override def references = children.flatMap(_.references).toSet
   def dataType = child.dataType match {
-    case ArrayType(dt) => dt
-    case MapType(_, vt) => vt
+    case ArrayType(dt, _) => dt
+    case MapType(_, vt, _) => vt
   }
   override lazy val resolved =
     childrenResolved &&

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
index dd78614..422839d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/generators.scala
@@ -84,8 +84,8 @@ case class Explode(attributeNames: Seq[String], child: Expression)
     (child.dataType.isInstanceOf[ArrayType] || child.dataType.isInstanceOf[MapType])
 
   private lazy val elementTypes = child.dataType match {
-    case ArrayType(et) => et :: Nil
-    case MapType(kt,vt) => kt :: vt :: Nil
+    case ArrayType(et, _) => et :: Nil
+    case MapType(kt,vt, _) => kt :: vt :: Nil
   }
 
   // TODO: Move this pattern into Generator.
@@ -102,10 +102,10 @@ case class Explode(attributeNames: Seq[String], child: Expression)
 
   override def eval(input: Row): TraversableOnce[Row] = {
     child.dataType match {
-      case ArrayType(_) =>
+      case ArrayType(_, _) =>
         val inputArray = child.eval(input).asInstanceOf[Seq[Any]]
         if (inputArray == null) Nil else inputArray.map(v => new GenericRow(Array(v)))
-      case MapType(_, _) =>
+      case MapType(_, _, _) =>
         val inputMap = child.eval(input).asInstanceOf[Map[Any,Any]]
         if (inputMap == null) Nil else inputMap.map { case (k,v) => new GenericRow(Array(k,v)) }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
index 3b3e206..ca96429 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala
@@ -24,4 +24,6 @@ package object catalyst {
    * 2.10.* builds.  See SI-6240 for more details.
    */
   protected[catalyst] object ScalaReflectionLock
+
+  protected[catalyst] type Logging = com.typesafe.scalalogging.slf4j.Logging
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
index 6783366..781ba48 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.planning
 
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.Logging
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.trees.TreeNode
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
index 418f868..bc763a4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala
@@ -19,9 +19,8 @@ package org.apache.spark.sql.catalyst.planning
 
 import scala.annotation.tailrec
 
-import org.apache.spark.sql.Logging
-
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.Logging
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 7b82e19..0988b0c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -125,51 +125,10 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
     }.toSeq
   }
 
-  protected def generateSchemaString(schema: Seq[Attribute]): String = {
-    val builder = new StringBuilder
-    builder.append("root\n")
-    val prefix = " |"
-    schema.foreach { attribute =>
-      val name = attribute.name
-      val dataType = attribute.dataType
-      dataType match {
-        case fields: StructType =>
-          builder.append(s"$prefix-- $name: $StructType\n")
-          generateSchemaString(fields, s"$prefix    |", builder)
-        case ArrayType(fields: StructType) =>
-          builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
-          generateSchemaString(fields, s"$prefix    |", builder)
-        case ArrayType(elementType: DataType) =>
-          builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
-        case _ => builder.append(s"$prefix-- $name: $dataType\n")
-      }
-    }
-
-    builder.toString()
-  }
-
-  protected def generateSchemaString(
-      schema: StructType,
-      prefix: String,
-      builder: StringBuilder): StringBuilder = {
-    schema.fields.foreach {
-      case StructField(name, fields: StructType, _) =>
-        builder.append(s"$prefix-- $name: $StructType\n")
-        generateSchemaString(fields, s"$prefix    |", builder)
-      case StructField(name, ArrayType(fields: StructType), _) =>
-        builder.append(s"$prefix-- $name: $ArrayType[$StructType]\n")
-        generateSchemaString(fields, s"$prefix    |", builder)
-      case StructField(name, ArrayType(elementType: DataType), _) =>
-        builder.append(s"$prefix-- $name: $ArrayType[$elementType]\n")
-      case StructField(name, fieldType: DataType, _) =>
-        builder.append(s"$prefix-- $name: $fieldType\n")
-    }
-
-    builder
-  }
+  def schema: StructType = StructType.fromAttributes(output)
 
   /** Returns the output schema in the tree format. */
-  def schemaString: String = generateSchemaString(output)
+  def schemaString: String = schema.treeString
 
   /** Prints out the schema in the tree format */
   def printSchema(): Unit = println(schemaString)

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 1537de2..3cb4072 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -177,7 +177,7 @@ case class LowerCaseSchema(child: LogicalPlan) extends UnaryNode {
     case StructType(fields) =>
       StructType(fields.map(f =>
         StructField(f.name.toLowerCase(), lowerCaseSchema(f.dataType), f.nullable)))
-    case ArrayType(elemType) => ArrayType(lowerCaseSchema(elemType))
+    case ArrayType(elemType, containsNull) => ArrayType(lowerCaseSchema(elemType), containsNull)
     case otherType => otherType
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
index 1076537..f8960b3 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/Rule.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.catalyst.rules
 
-import org.apache.spark.sql.Logging
+import org.apache.spark.sql.catalyst.Logging
 import org.apache.spark.sql.catalyst.trees.TreeNode
 
 abstract class Rule[TreeType <: TreeNode[_]] extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index e300bdb..6aa407c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -15,10 +15,9 @@
  * limitations under the License.
  */
 
-package org.apache.spark.sql
-package catalyst
-package rules
+package org.apache.spark.sql.catalyst.rules
 
+import org.apache.spark.sql.catalyst.Logging
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util.sideBySide
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
index d159ecd..9a28d03 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/package.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.catalyst
 
-import org.apache.spark.sql.Logger
-
 /**
  * A library for easily manipulating trees of operators.  Operators that extend TreeNode are
  * granted the following interface:
@@ -35,5 +33,6 @@ import org.apache.spark.sql.Logger
  */
 package object trees {
   // Since we want tree nodes to be lightweight, we create one logger for all treenode instances.
-  protected val logger = Logger("catalyst.trees")
+  protected val logger =
+    com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger("catalyst.trees"))
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 71808f7..b52ee6d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -45,11 +45,13 @@ object DataType extends RegexParsers {
     "TimestampType" ^^^ TimestampType
 
   protected lazy val arrayType: Parser[DataType] =
-    "ArrayType" ~> "(" ~> dataType <~ ")" ^^ ArrayType
+    "ArrayType" ~> "(" ~> dataType ~ "," ~ boolVal <~ ")" ^^ {
+      case tpe ~ _ ~ containsNull => ArrayType(tpe, containsNull)
+    }
 
   protected lazy val mapType: Parser[DataType] =
-    "MapType" ~> "(" ~> dataType ~ "," ~ dataType <~ ")" ^^ {
-      case t1 ~ _ ~ t2 => MapType(t1, t2)
+    "MapType" ~> "(" ~> dataType ~ "," ~ dataType ~ "," ~ boolVal <~ ")" ^^ {
+      case t1 ~ _ ~ t2 ~ _ ~ valueContainsNull => MapType(t1, t2, valueContainsNull)
     }
 
   protected lazy val structField: Parser[StructField] =
@@ -82,6 +84,21 @@ object DataType extends RegexParsers {
     case Success(result, _) => result
     case failure: NoSuccess => sys.error(s"Unsupported dataType: $asString, $failure")
   }
+
+  protected[types] def buildFormattedString(
+      dataType: DataType,
+      prefix: String,
+      builder: StringBuilder): Unit = {
+    dataType match {
+      case array: ArrayType =>
+        array.buildFormattedString(prefix, builder)
+      case struct: StructType =>
+        struct.buildFormattedString(prefix, builder)
+      case map: MapType =>
+        map.buildFormattedString(prefix, builder)
+      case _ =>
+    }
+  }
 }
 
 abstract class DataType {
@@ -92,9 +109,13 @@ abstract class DataType {
   }
 
   def isPrimitive: Boolean = false
+
+  def simpleString: String
 }
 
-case object NullType extends DataType
+case object NullType extends DataType {
+  def simpleString: String = "null"
+}
 
 object NativeType {
   def all = Seq(
@@ -108,40 +129,45 @@ trait PrimitiveType extends DataType {
 }
 
 abstract class NativeType extends DataType {
-  type JvmType
-  @transient val tag: TypeTag[JvmType]
-  val ordering: Ordering[JvmType]
+  private[sql] type JvmType
+  @transient private[sql] val tag: TypeTag[JvmType]
+  private[sql] val ordering: Ordering[JvmType]
 
-  @transient val classTag = ScalaReflectionLock.synchronized {
+  @transient private[sql] val classTag = ScalaReflectionLock.synchronized {
     val mirror = runtimeMirror(Utils.getSparkClassLoader)
     ClassTag[JvmType](mirror.runtimeClass(tag.tpe))
   }
 }
 
 case object StringType extends NativeType with PrimitiveType {
-  type JvmType = String
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = String
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "string"
 }
 
 case object BinaryType extends DataType with PrimitiveType {
-  type JvmType = Array[Byte]
+  private[sql] type JvmType = Array[Byte]
+  def simpleString: String = "binary"
 }
 
 case object BooleanType extends NativeType with PrimitiveType {
-  type JvmType = Boolean
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Boolean
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "boolean"
 }
 
 case object TimestampType extends NativeType {
-  type JvmType = Timestamp
+  private[sql] type JvmType = Timestamp
 
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
 
-  val ordering = new Ordering[JvmType] {
+  private[sql] val ordering = new Ordering[JvmType] {
     def compare(x: Timestamp, y: Timestamp) = x.compareTo(y)
   }
+
+  def simpleString: String = "timestamp"
 }
 
 abstract class NumericType extends NativeType with PrimitiveType {
@@ -150,7 +176,7 @@ abstract class NumericType extends NativeType with PrimitiveType {
   // type parameter and and add a numeric annotation (i.e., [JvmType : Numeric]). This gets
   // desugared by the compiler into an argument to the objects constructor. This means there is no
   // longer an no argument constructor and thus the JVM cannot serialize the object anymore.
-  val numeric: Numeric[JvmType]
+  private[sql] val numeric: Numeric[JvmType]
 }
 
 object NumericType {
@@ -166,39 +192,43 @@ object IntegralType {
 }
 
 abstract class IntegralType extends NumericType {
-  val integral: Integral[JvmType]
+  private[sql] val integral: Integral[JvmType]
 }
 
 case object LongType extends IntegralType {
-  type JvmType = Long
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Long]]
-  val integral = implicitly[Integral[Long]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Long
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Long]]
+  private[sql] val integral = implicitly[Integral[Long]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "long"
 }
 
 case object IntegerType extends IntegralType {
-  type JvmType = Int
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Int]]
-  val integral = implicitly[Integral[Int]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Int
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Int]]
+  private[sql] val integral = implicitly[Integral[Int]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "integer"
 }
 
 case object ShortType extends IntegralType {
-  type JvmType = Short
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Short]]
-  val integral = implicitly[Integral[Short]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Short
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Short]]
+  private[sql] val integral = implicitly[Integral[Short]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "short"
 }
 
 case object ByteType extends IntegralType {
-  type JvmType = Byte
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Byte]]
-  val integral = implicitly[Integral[Byte]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Byte
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Byte]]
+  private[sql] val integral = implicitly[Integral[Byte]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "byte"
 }
 
 /** Matcher for any expressions that evaluate to [[FractionalType]]s */
@@ -209,47 +239,159 @@ object FractionalType {
   }
 }
 abstract class FractionalType extends NumericType {
-  val fractional: Fractional[JvmType]
+  private[sql] val fractional: Fractional[JvmType]
 }
 
 case object DecimalType extends FractionalType {
-  type JvmType = BigDecimal
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[BigDecimal]]
-  val fractional = implicitly[Fractional[BigDecimal]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = BigDecimal
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[BigDecimal]]
+  private[sql] val fractional = implicitly[Fractional[BigDecimal]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "decimal"
 }
 
 case object DoubleType extends FractionalType {
-  type JvmType = Double
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Double]]
-  val fractional = implicitly[Fractional[Double]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Double
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Double]]
+  private[sql] val fractional = implicitly[Fractional[Double]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "double"
 }
 
 case object FloatType extends FractionalType {
-  type JvmType = Float
-  @transient lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
-  val numeric = implicitly[Numeric[Float]]
-  val fractional = implicitly[Fractional[Float]]
-  val ordering = implicitly[Ordering[JvmType]]
+  private[sql] type JvmType = Float
+  @transient private[sql] lazy val tag = ScalaReflectionLock.synchronized { typeTag[JvmType] }
+  private[sql] val numeric = implicitly[Numeric[Float]]
+  private[sql] val fractional = implicitly[Fractional[Float]]
+  private[sql] val ordering = implicitly[Ordering[JvmType]]
+  def simpleString: String = "float"
 }
 
-case class ArrayType(elementType: DataType) extends DataType
+object ArrayType {
+  /** Construct a [[ArrayType]] object with the given element type. The `containsNull` is false. */
+  def apply(elementType: DataType): ArrayType = ArrayType(elementType, false)
+}
 
-case class StructField(name: String, dataType: DataType, nullable: Boolean)
+/**
+ * The data type for collections of multiple values.
+ * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+ *
+ * @param elementType The data type of values.
+ * @param containsNull Indicates if values have `null` values
+ */
+case class ArrayType(elementType: DataType, containsNull: Boolean) extends DataType {
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(
+      s"${prefix}-- element: ${elementType.simpleString} (containsNull = ${containsNull})\n")
+    DataType.buildFormattedString(elementType, s"$prefix    |", builder)
+  }
+
+  def simpleString: String = "array"
+}
+
+/**
+ * A field inside a StructType.
+ * @param name The name of this field.
+ * @param dataType The data type of this field.
+ * @param nullable Indicates if values of this field can be `null` values.
+ */
+case class StructField(name: String, dataType: DataType, nullable: Boolean) {
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"${prefix}-- ${name}: ${dataType.simpleString} (nullable = ${nullable})\n")
+    DataType.buildFormattedString(dataType, s"$prefix    |", builder)
+  }
+}
 
 object StructType {
-  def fromAttributes(attributes: Seq[Attribute]): StructType = {
+  protected[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
     StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable)))
-  }
 
-  // def apply(fields: Seq[StructField]) = new StructType(fields.toIndexedSeq)
+  private def validateFields(fields: Seq[StructField]): Boolean =
+    fields.map(field => field.name).distinct.size == fields.size
 }
 
 case class StructType(fields: Seq[StructField]) extends DataType {
-  def toAttributes = fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+  require(StructType.validateFields(fields), "Found fields with the same name.")
+
+  /**
+   * Returns all field names in a [[Seq]].
+   */
+  lazy val fieldNames: Seq[String] = fields.map(_.name)
+  private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
+  private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
+  /**
+   * Extracts a [[StructField]] of the given name. If the [[StructType]] object does not
+   * have a name matching the given name, `null` will be returned.
+   */
+  def apply(name: String): StructField = {
+    nameToField.get(name).getOrElse(
+      throw new IllegalArgumentException(s"Field ${name} does not exist."))
+  }
+
+  /**
+   * Returns a [[StructType]] containing [[StructField]]s of the given names.
+   * Those names which do not have matching fields will be ignored.
+   */
+  def apply(names: Set[String]): StructType = {
+    val nonExistFields = names -- fieldNamesSet
+    if (!nonExistFields.isEmpty) {
+      throw new IllegalArgumentException(
+        s"Field ${nonExistFields.mkString(",")} does not exist.")
+    }
+    // Preserve the original order of fields.
+    StructType(fields.filter(f => names.contains(f.name)))
+  }
+
+  protected[sql] def toAttributes =
+    fields.map(f => AttributeReference(f.name, f.dataType, f.nullable)())
+
+  def treeString: String = {
+    val builder = new StringBuilder
+    builder.append("root\n")
+    val prefix = " |"
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+
+    builder.toString()
+  }
+
+  def printTreeString(): Unit = println(treeString)
+
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    fields.foreach(field => field.buildFormattedString(prefix, builder))
+  }
+
+  def simpleString: String = "struct"
+}
+
+object MapType {
+  /**
+   * Construct a [[MapType]] object with the given key type and value type.
+   * The `valueContainsNull` is true.
+   */
+  def apply(keyType: DataType, valueType: DataType): MapType =
+    MapType(keyType: DataType, valueType: DataType, true)
 }
 
-case class MapType(keyType: DataType, valueType: DataType) extends DataType
+/**
+ * The data type for Maps. Keys in a map are not allowed to have `null` values.
+ * @param keyType The data type of map keys.
+ * @param valueType The data type of map values.
+ * @param valueContainsNull Indicates if map values have `null` values.
+ */
+case class MapType(
+    keyType: DataType,
+    valueType: DataType,
+    valueContainsNull: Boolean) extends DataType {
+  private[sql] def buildFormattedString(prefix: String, builder: StringBuilder): Unit = {
+    builder.append(s"${prefix}-- key: ${keyType.simpleString}\n")
+    builder.append(s"${prefix}-- value: ${valueType.simpleString} " +
+      s"(valueContainsNull = ${valueContainsNull})\n")
+    DataType.buildFormattedString(keyType, s"$prefix    |", builder)
+    DataType.buildFormattedString(valueType, s"$prefix    |", builder)
+  }
+
+  def simpleString: String = "map"
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java b/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java
deleted file mode 100644
index 5360361..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/package-info.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Allows the execution of relational queries, including those expressed in SQL using Spark.
- */
-package org.apache.spark.sql;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
deleted file mode 100644
index 4589129..0000000
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/package.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark
-
-/**
- * Allows the execution of relational queries, including those expressed in SQL using Spark.
- *
- * Note that this package is located in catalyst instead of in core so that all subprojects can
- * inherit the settings from this package object.
- */
-package object sql {
-
-  protected[sql] def Logger(name: String) =
-    com.typesafe.scalalogging.slf4j.Logger(org.slf4j.LoggerFactory.getLogger(name))
-
-  protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
-
-  type Row = catalyst.expressions.Row
-
-  val Row = catalyst.expressions.Row
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
index c0438db..e030d6e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/ScalaReflectionSuite.scala
@@ -17,11 +17,11 @@
 
 package org.apache.spark.sql.catalyst
 
+import java.math.BigInteger
 import java.sql.Timestamp
 
 import org.scalatest.FunSuite
 
-import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types._
 
 case class PrimitiveData(
@@ -148,4 +148,68 @@ class ScalaReflectionSuite extends FunSuite {
         StructField("_2", StringType, nullable = true))),
       nullable = true))
   }
+
+  test("get data type of a value") {
+    // BooleanType
+    assert(BooleanType === typeOfObject(true))
+    assert(BooleanType === typeOfObject(false))
+
+    // BinaryType
+    assert(BinaryType === typeOfObject("string".getBytes))
+
+    // StringType
+    assert(StringType === typeOfObject("string"))
+
+    // ByteType
+    assert(ByteType === typeOfObject(127.toByte))
+
+    // ShortType
+    assert(ShortType === typeOfObject(32767.toShort))
+
+    // IntegerType
+    assert(IntegerType === typeOfObject(2147483647))
+
+    // LongType
+    assert(LongType === typeOfObject(9223372036854775807L))
+
+    // FloatType
+    assert(FloatType === typeOfObject(3.4028235E38.toFloat))
+
+    // DoubleType
+    assert(DoubleType === typeOfObject(1.7976931348623157E308))
+
+    // DecimalType
+    assert(DecimalType === typeOfObject(BigDecimal("1.7976931348623157E318")))
+
+    // TimestampType
+    assert(TimestampType === typeOfObject(java.sql.Timestamp.valueOf("2014-7-25 10:26:00")))
+
+    // NullType
+    assert(NullType === typeOfObject(null))
+
+    def typeOfObject1: PartialFunction[Any, DataType] = typeOfObject orElse {
+      case value: java.math.BigInteger => DecimalType
+      case value: java.math.BigDecimal => DecimalType
+      case _ => StringType
+    }
+
+    assert(DecimalType === typeOfObject1(
+      new BigInteger("92233720368547758070")))
+    assert(DecimalType === typeOfObject1(
+      new java.math.BigDecimal("1.7976931348623157E318")))
+    assert(StringType === typeOfObject1(BigInt("92233720368547758070")))
+
+    def typeOfObject2: PartialFunction[Any, DataType] = typeOfObject orElse {
+      case value: java.math.BigInteger => DecimalType
+    }
+
+    intercept[MatchError](typeOfObject2(BigInt("92233720368547758070")))
+
+    def typeOfObject3: PartialFunction[Any, DataType] = typeOfObject orElse {
+      case c: Seq[_] => ArrayType(typeOfObject3(c.head))
+    }
+
+    assert(ArrayType(IntegerType) === typeOfObject3(Seq(1, 2, 3)))
+    assert(ArrayType(ArrayType(IntegerType)) === typeOfObject3(Seq(Seq(1,2,3))))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ArrayType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ArrayType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ArrayType.java
new file mode 100644
index 0000000..17334ca
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ArrayType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing Lists.
+ * An ArrayType object comprises two fields, {@code DataType elementType} and
+ * {@code boolean containsNull}. The field of {@code elementType} is used to specify the type of
+ * array elements. The field of {@code containsNull} is used to specify if the array has
+ * {@code null} values.
+ *
+ * To create an {@link ArrayType},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createArrayType(DataType)} or
+ * {@link org.apache.spark.sql.api.java.types.DataType#createArrayType(DataType, boolean)}
+ * should be used.
+ */
+public class ArrayType extends DataType {
+  private DataType elementType;
+  private boolean containsNull;
+
+  protected ArrayType(DataType elementType, boolean containsNull) {
+    this.elementType = elementType;
+    this.containsNull = containsNull;
+  }
+
+  public DataType getElementType() {
+    return elementType;
+  }
+
+  public boolean isContainsNull() {
+    return containsNull;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    ArrayType arrayType = (ArrayType) o;
+
+    if (containsNull != arrayType.containsNull) return false;
+    if (!elementType.equals(arrayType.elementType)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = elementType.hashCode();
+    result = 31 * result + (containsNull ? 1 : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BinaryType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BinaryType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BinaryType.java
new file mode 100644
index 0000000..6170317
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BinaryType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing byte[] values.
+ *
+ * {@code BinaryType} is represented by the singleton object {@link DataType#BinaryType}.
+ */
+public class BinaryType extends DataType {
+  protected BinaryType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BooleanType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BooleanType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BooleanType.java
new file mode 100644
index 0000000..8fa24d8
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/BooleanType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing boolean and Boolean values.
+ *
+ * {@code BooleanType} is represented by the singleton object {@link DataType#BooleanType}.
+ */
+public class BooleanType extends DataType {
+  protected BooleanType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ByteType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ByteType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ByteType.java
new file mode 100644
index 0000000..2de3297
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ByteType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing byte and Byte values.
+ *
+ * {@code ByteType} is represented by the singleton object {@link DataType#ByteType}.
+ */
+public class ByteType extends DataType {
+  protected ByteType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DataType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DataType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DataType.java
new file mode 100644
index 0000000..f84e5a4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DataType.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * The base type of all Spark SQL data types.
+ *
+ * To get/create specific data type, users should use singleton objects and factory methods
+ * provided by this class.
+ */
+public abstract class DataType {
+
+  /**
+   * Gets the StringType object.
+   */
+  public static final StringType StringType = new StringType();
+
+  /**
+   * Gets the BinaryType object.
+   */
+  public static final BinaryType BinaryType = new BinaryType();
+
+  /**
+   * Gets the BooleanType object.
+   */
+  public static final BooleanType BooleanType = new BooleanType();
+
+  /**
+   * Gets the TimestampType object.
+   */
+  public static final TimestampType TimestampType = new TimestampType();
+
+  /**
+   * Gets the DecimalType object.
+   */
+  public static final DecimalType DecimalType = new DecimalType();
+
+  /**
+   * Gets the DoubleType object.
+   */
+  public static final DoubleType DoubleType = new DoubleType();
+
+  /**
+   * Gets the FloatType object.
+   */
+  public static final FloatType FloatType = new FloatType();
+
+  /**
+   * Gets the ByteType object.
+   */
+  public static final ByteType ByteType = new ByteType();
+
+  /**
+   * Gets the IntegerType object.
+   */
+  public static final IntegerType IntegerType = new IntegerType();
+
+  /**
+   * Gets the LongType object.
+   */
+  public static final LongType LongType = new LongType();
+
+  /**
+   * Gets the ShortType object.
+   */
+  public static final ShortType ShortType = new ShortType();
+
+  /**
+   * Creates an ArrayType by specifying the data type of elements ({@code elementType}).
+   * The field of {@code containsNull} is set to {@code false}.
+   */
+  public static ArrayType createArrayType(DataType elementType) {
+    if (elementType == null) {
+      throw new IllegalArgumentException("elementType should not be null.");
+    }
+
+    return new ArrayType(elementType, false);
+  }
+
+  /**
+   * Creates an ArrayType by specifying the data type of elements ({@code elementType}) and
+   * whether the array contains null values ({@code containsNull}).
+   */
+  public static ArrayType createArrayType(DataType elementType, boolean containsNull) {
+    if (elementType == null) {
+      throw new IllegalArgumentException("elementType should not be null.");
+    }
+
+    return new ArrayType(elementType, containsNull);
+  }
+
+  /**
+   * Creates a MapType by specifying the data type of keys ({@code keyType}) and values
+   * ({@code keyType}). The field of {@code valueContainsNull} is set to {@code true}.
+   */
+  public static MapType createMapType(DataType keyType, DataType valueType) {
+    if (keyType == null) {
+      throw new IllegalArgumentException("keyType should not be null.");
+    }
+    if (valueType == null) {
+      throw new IllegalArgumentException("valueType should not be null.");
+    }
+
+    return new MapType(keyType, valueType, true);
+  }
+
+  /**
+   * Creates a MapType by specifying the data type of keys ({@code keyType}), the data type of
+   * values ({@code keyType}), and whether values contain any null value
+   * ({@code valueContainsNull}).
+   */
+  public static MapType createMapType(
+      DataType keyType,
+      DataType valueType,
+      boolean valueContainsNull) {
+    if (keyType == null) {
+      throw new IllegalArgumentException("keyType should not be null.");
+    }
+    if (valueType == null) {
+      throw new IllegalArgumentException("valueType should not be null.");
+    }
+
+    return new MapType(keyType, valueType, valueContainsNull);
+  }
+
+  /**
+   * Creates a StructField by specifying the name ({@code name}), data type ({@code dataType}) and
+   * whether values of this field can be null values ({@code nullable}).
+   */
+  public static StructField createStructField(String name, DataType dataType, boolean nullable) {
+    if (name == null) {
+      throw new IllegalArgumentException("name should not be null.");
+    }
+    if (dataType == null) {
+      throw new IllegalArgumentException("dataType should not be null.");
+    }
+
+    return new StructField(name, dataType, nullable);
+  }
+
+  /**
+   * Creates a StructType with the given list of StructFields ({@code fields}).
+   */
+  public static StructType createStructType(List<StructField> fields) {
+    return createStructType(fields.toArray(new StructField[0]));
+  }
+
+  /**
+   * Creates a StructType with the given StructField array ({@code fields}).
+   */
+  public static StructType createStructType(StructField[] fields) {
+    if (fields == null) {
+      throw new IllegalArgumentException("fields should not be null.");
+    }
+    Set<String> distinctNames = new HashSet<String>();
+    for (StructField field: fields) {
+      if (field == null) {
+        throw new IllegalArgumentException(
+          "fields should not contain any null.");
+      }
+
+      distinctNames.add(field.getName());
+    }
+    if (distinctNames.size() != fields.length) {
+      throw new IllegalArgumentException("fields should have distinct names.");
+    }
+
+    return new StructType(fields);
+  }
+
+}