You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/07/30 09:20:04 UTC

[2/3] [SPARK-2179][SQL] Public API for DataTypes and Schema

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
new file mode 100644
index 0000000..9250491
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DecimalType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing java.math.BigDecimal values.
+ *
+ * {@code DecimalType} is represented by the singleton object {@link DataType#DecimalType}.
+ */
+public class DecimalType extends DataType {
+  protected DecimalType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
new file mode 100644
index 0000000..3e86917
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/DoubleType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing double and Double values.
+ *
+ * {@code DoubleType} is represented by the singleton object {@link DataType#DoubleType}.
+ */
+public class DoubleType extends DataType {
+  protected DoubleType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
new file mode 100644
index 0000000..fa860d4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/FloatType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing float and Float values.
+ *
+ * {@code FloatType} is represented by the singleton object {@link DataType#FloatType}.
+ */
+public class FloatType extends DataType {
+  protected FloatType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
new file mode 100644
index 0000000..bd973ec
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/IntegerType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing int and Integer values.
+ *
+ * {@code IntegerType} is represented by the singleton object {@link DataType#IntegerType}.
+ */
+public class IntegerType extends DataType {
+  protected IntegerType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
new file mode 100644
index 0000000..e002333
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/LongType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing long and Long values.
+ *
+ * {@code LongType} is represented by the singleton object {@link DataType#LongType}.
+ */
+public class LongType extends DataType {
+  protected LongType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
new file mode 100644
index 0000000..94936e2
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/MapType.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing Maps. A MapType object comprises two fields,
+ * {@code DataType keyType}, {@code DataType valueType}, and {@code boolean valueContainsNull}.
+ * The field of {@code keyType} is used to specify the type of keys in the map.
+ * The field of {@code valueType} is used to specify the type of values in the map.
+ * The field of {@code valueContainsNull} is used to specify if map values have
+ * {@code null} values.
+ * For values of a MapType column, keys are not allowed to have {@code null} values.
+ *
+ * To create a {@link MapType},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createMapType(DataType, DataType)} or
+ * {@link org.apache.spark.sql.api.java.types.DataType#createMapType(DataType, DataType, boolean)}
+ * should be used.
+ */
+public class MapType extends DataType {
+  private DataType keyType;
+  private DataType valueType;
+  private boolean valueContainsNull;
+
+  protected MapType(DataType keyType, DataType valueType, boolean valueContainsNull) {
+    this.keyType = keyType;
+    this.valueType = valueType;
+    this.valueContainsNull = valueContainsNull;
+  }
+
+  public DataType getKeyType() {
+    return keyType;
+  }
+
+  public DataType getValueType() {
+    return valueType;
+  }
+
+  public boolean isValueContainsNull() {
+    return valueContainsNull;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    MapType mapType = (MapType) o;
+
+    if (valueContainsNull != mapType.valueContainsNull) return false;
+    if (!keyType.equals(mapType.keyType)) return false;
+    if (!valueType.equals(mapType.valueType)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = keyType.hashCode();
+    result = 31 * result + valueType.hashCode();
+    result = 31 * result + (valueContainsNull ? 1 : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
new file mode 100644
index 0000000..98f9507
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/ShortType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing short and Short values.
+ *
+ * {@code ShortType} is represented by the singleton object {@link DataType#ShortType}.
+ */
+public class ShortType extends DataType {
+  protected ShortType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
new file mode 100644
index 0000000..b8e7dbe
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StringType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing String values.
+ *
+ * {@code StringType} is represented by the singleton object {@link DataType#StringType}.
+ */
+public class StringType extends DataType {
+  protected StringType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
new file mode 100644
index 0000000..54e9c11
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructField.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * A StructField object represents a field in a StructType object.
+ * A StructField object comprises three fields, {@code String name}, {@code DataType dataType},
+ * and {@code boolean nullable}. The field of {@code name} is the name of a StructField.
+ * The field of {@code dataType} specifies the data type of a StructField.
+ * The field of {@code nullable} specifies if values of a StructField can contain {@code null}
+ * values.
+ *
+ * To create a {@link StructField},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructField(String, DataType, boolean)}
+ * should be used.
+ */
+public class StructField {
+  private String name;
+  private DataType dataType;
+  private boolean nullable;
+
+  protected StructField(String name, DataType dataType, boolean nullable) {
+    this.name = name;
+    this.dataType = dataType;
+    this.nullable = nullable;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public DataType getDataType() {
+    return dataType;
+  }
+
+  public boolean isNullable() {
+    return nullable;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    StructField that = (StructField) o;
+
+    if (nullable != that.nullable) return false;
+    if (!dataType.equals(that.dataType)) return false;
+    if (!name.equals(that.name)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = name.hashCode();
+    result = 31 * result + dataType.hashCode();
+    result = 31 * result + (nullable ? 1 : 0);
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
new file mode 100644
index 0000000..33a42f4
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/StructType.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+import java.util.Arrays;
+import java.util.List;
+
+/**
+ * The data type representing Rows.
+ * A StructType object comprises an array of StructFields.
+ *
+ * To create an {@link StructType},
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructType(java.util.List)} or
+ * {@link org.apache.spark.sql.api.java.types.DataType#createStructType(StructField[])}
+ * should be used.
+ */
+public class StructType extends DataType {
+  private StructField[] fields;
+
+  protected StructType(StructField[] fields) {
+    this.fields = fields;
+  }
+
+  public StructField[] getFields() {
+    return fields;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) return true;
+    if (o == null || getClass() != o.getClass()) return false;
+
+    StructType that = (StructType) o;
+
+    if (!Arrays.equals(fields, that.fields)) return false;
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    return Arrays.hashCode(fields);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
new file mode 100644
index 0000000..6529577
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/TimestampType.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.java.types;
+
+/**
+ * The data type representing java.sql.Timestamp values.
+ *
+ * {@code TimestampType} is represented by the singleton object {@link DataType#TimestampType}.
+ */
+public class TimestampType extends DataType {
+  protected TimestampType() {}
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
new file mode 100644
index 0000000..f169ac6
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/api/java/types/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/**
+ * Allows users to get and create Spark SQL data types.
+ */
+package org.apache.spark.sql.api.java.types;

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index e4b6810..8633875 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -31,7 +31,6 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.columnar.InMemoryRelation
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.SparkStrategies
@@ -89,6 +88,44 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new SchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))
 
   /**
+   * :: DeveloperApi ::
+   * Creates a [[SchemaRDD]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
+   * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
+   * the provided schema. Otherwise, there will be runtime exception.
+   * Example:
+   * {{{
+   *  import org.apache.spark.sql._
+   *  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
+   *
+   *  val schema =
+   *    StructType(
+   *      StructField("name", StringType, false) ::
+   *      StructField("age", IntegerType, true) :: Nil)
+   *
+   *  val people =
+   *    sc.textFile("examples/src/main/resources/people.txt").map(
+   *      _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
+   *  val peopleSchemaRDD = sqlContext. applySchema(people, schema)
+   *  peopleSchemaRDD.printSchema
+   *  // root
+   *  // |-- name: string (nullable = false)
+   *  // |-- age: integer (nullable = true)
+   *
+   *    peopleSchemaRDD.registerAsTable("people")
+   *  sqlContext.sql("select name from people").collect.foreach(println)
+   * }}}
+   *
+   * @group userf
+   */
+  @DeveloperApi
+  def applySchema(rowRDD: RDD[Row], schema: StructType): SchemaRDD = {
+    // TODO: use MutableProjection when rowRDD is another SchemaRDD and the applied
+    // schema differs from the existing schema on any field data type.
+    val logicalPlan = SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRDD))(self)
+    new SchemaRDD(this, logicalPlan)
+  }
+
+  /**
    * Loads a Parquet file, returning the result as a [[SchemaRDD]].
    *
    * @group userf
@@ -106,6 +143,19 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * :: Experimental ::
+   * Loads a JSON file (one object per line) and applies the given schema,
+   * returning the result as a [[SchemaRDD]].
+   *
+   * @group userf
+   */
+  @Experimental
+  def jsonFile(path: String, schema: StructType): SchemaRDD = {
+    val json = sparkContext.textFile(path)
+    jsonRDD(json, schema)
+  }
+
+  /**
+   * :: Experimental ::
    */
   @Experimental
   def jsonFile(path: String, samplingRatio: Double): SchemaRDD = {
@@ -124,10 +174,28 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * :: Experimental ::
+   * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+   * returning the result as a [[SchemaRDD]].
+   *
+   * @group userf
+   */
+  @Experimental
+  def jsonRDD(json: RDD[String], schema: StructType): SchemaRDD = {
+    val appliedSchema =
+      Option(schema).getOrElse(JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, 1.0)))
+    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+    applySchema(rowRDD, appliedSchema)
+  }
+
+  /**
+   * :: Experimental ::
    */
   @Experimental
-  def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD =
-    new SchemaRDD(this, JsonRDD.inferSchema(self, json, samplingRatio))
+  def jsonRDD(json: RDD[String], samplingRatio: Double): SchemaRDD = {
+    val appliedSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json, samplingRatio))
+    val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema)
+    applySchema(rowRDD, appliedSchema)
+  }
 
   /**
    * :: Experimental ::
@@ -345,70 +413,138 @@ class SQLContext(@transient val sparkContext: SparkContext)
 
   /**
    * Peek at the first row of the RDD and infer its schema.
-   * TODO: consolidate this with the type system developed in SPARK-2060.
+   * It is only used by PySpark.
    */
   private[sql] def inferSchema(rdd: RDD[Map[String, _]]): SchemaRDD = {
     import scala.collection.JavaConversions._
-    def typeFor(obj: Any): DataType = obj match {
-      case c: java.lang.String => StringType
-      case c: java.lang.Integer => IntegerType
-      case c: java.lang.Long => LongType
-      case c: java.lang.Double => DoubleType
-      case c: java.lang.Boolean => BooleanType
-      case c: java.math.BigDecimal => DecimalType
-      case c: java.sql.Timestamp => TimestampType
+
+    def typeOfComplexValue: PartialFunction[Any, DataType] = {
       case c: java.util.Calendar => TimestampType
-      case c: java.util.List[_] => ArrayType(typeFor(c.head))
+      case c: java.util.List[_] =>
+        ArrayType(typeOfObject(c.head))
       case c: java.util.Map[_, _] =>
         val (key, value) = c.head
-        MapType(typeFor(key), typeFor(value))
+        MapType(typeOfObject(key), typeOfObject(value))
       case c if c.getClass.isArray =>
         val elem = c.asInstanceOf[Array[_]].head
-        ArrayType(typeFor(elem))
+        ArrayType(typeOfObject(elem))
       case c => throw new Exception(s"Object of type $c cannot be used")
     }
+    def typeOfObject = ScalaReflection.typeOfObject orElse typeOfComplexValue
+
     val firstRow = rdd.first()
-    val schema = firstRow.map { case (fieldName, obj) =>
-      AttributeReference(fieldName, typeFor(obj), true)()
+    val fields = firstRow.map {
+      case (fieldName, obj) => StructField(fieldName, typeOfObject(obj), true)
     }.toSeq
 
-    def needTransform(obj: Any): Boolean = obj match {
-      case c: java.util.List[_] => true
-      case c: java.util.Map[_, _] => true
-      case c if c.getClass.isArray => true
-      case c: java.util.Calendar => true
-      case c => false
+    applySchemaToPythonRDD(rdd, StructType(fields))
+  }
+
+  /**
+   * Parses the data type in our internal string representation. The data type string should
+   * have the same format as the one generated by `toString` in scala.
+   * It is only used by PySpark.
+   */
+  private[sql] def parseDataType(dataTypeString: String): DataType = {
+    val parser = org.apache.spark.sql.catalyst.types.DataType
+    parser(dataTypeString)
+  }
+
+  /**
+   * Apply a schema defined by the schemaString to an RDD. It is only used by PySpark.
+   */
+  private[sql] def applySchemaToPythonRDD(
+      rdd: RDD[Map[String, _]],
+      schemaString: String): SchemaRDD = {
+    val schema = parseDataType(schemaString).asInstanceOf[StructType]
+    applySchemaToPythonRDD(rdd, schema)
+  }
+
+  /**
+   * Apply a schema defined by the schema to an RDD. It is only used by PySpark.
+   */
+  private[sql] def applySchemaToPythonRDD(
+      rdd: RDD[Map[String, _]],
+      schema: StructType): SchemaRDD = {
+    // TODO: We should have a better implementation once we do not turn a Python side record
+    // to a Map.
+    import scala.collection.JavaConversions._
+    import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
+
+    def needsConversion(dataType: DataType): Boolean = dataType match {
+      case ByteType => true
+      case ShortType => true
+      case FloatType => true
+      case TimestampType => true
+      case ArrayType(_, _) => true
+      case MapType(_, _, _) => true
+      case StructType(_) => true
+      case other => false
     }
 
-    // convert JList, JArray into Seq, convert JMap into Map
-    // convert Calendar into Timestamp
-    def transform(obj: Any): Any = obj match {
-      case c: java.util.List[_] => c.map(transform).toSeq
-      case c: java.util.Map[_, _] => c.map {
-        case (key, value) => (key, transform(value))
-      }.toMap
-      case c if c.getClass.isArray =>
-        c.asInstanceOf[Array[_]].map(transform).toSeq
-      case c: java.util.Calendar =>
-        new java.sql.Timestamp(c.getTime().getTime())
-      case c => c
+    // Converts value to the type specified by the data type.
+    // Because Python does not have data types for TimestampType, FloatType, ShortType, and
+    // ByteType, we need to explicitly convert values in columns of these data types to the desired
+    // JVM data types.
+    def convert(obj: Any, dataType: DataType): Any = (obj, dataType) match {
+      // TODO: We should check nullable
+      case (null, _) => null
+
+      case (c: java.util.List[_], ArrayType(elementType, _)) =>
+        val converted = c.map { e => convert(e, elementType)}
+        JListWrapper(converted)
+
+      case (c: java.util.Map[_, _], struct: StructType) =>
+        val row = new GenericMutableRow(struct.fields.length)
+        struct.fields.zipWithIndex.foreach {
+          case (field, i) =>
+            val value = convert(c.get(field.name), field.dataType)
+            row.update(i, value)
+        }
+        row
+
+      case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) =>
+        val converted = c.map {
+          case (key, value) =>
+            (convert(key, keyType), convert(value, valueType))
+        }
+        JMapWrapper(converted)
+
+      case (c, ArrayType(elementType, _)) if c.getClass.isArray =>
+        val converted = c.asInstanceOf[Array[_]].map(e => convert(e, elementType))
+        converted: Seq[Any]
+
+      case (c: java.util.Calendar, TimestampType) => new java.sql.Timestamp(c.getTime().getTime())
+      case (c: Int, ByteType) => c.toByte
+      case (c: Int, ShortType) => c.toShort
+      case (c: Double, FloatType) => c.toFloat
+
+      case (c, _) => c
+    }
+
+    val convertedRdd = if (schema.fields.exists(f => needsConversion(f.dataType))) {
+      rdd.map(m => m.map { case (key, value) => (key, convert(value, schema(key).dataType)) })
+    } else {
+      rdd
     }
 
-    val need = firstRow.exists {case (key, value) => needTransform(value)}
-    val transformed = if (need) {
-      rdd.mapPartitions { iter =>
-        iter.map {
-          m => m.map {case (key, value) => (key, transform(value))}
+    val rowRdd = convertedRdd.mapPartitions { iter =>
+      val row = new GenericMutableRow(schema.fields.length)
+      val fieldsWithIndex = schema.fields.zipWithIndex
+      iter.map { m =>
+        // We cannot use m.values because the order of values returned by m.values may not
+        // match fields order.
+        fieldsWithIndex.foreach {
+          case (field, i) =>
+            val value =
+              m.get(field.name).flatMap(v => Option(v)).map(v => convert(v, field.dataType)).orNull
+            row.update(i, value)
         }
-      }
-    } else rdd
 
-    val rowRdd = transformed.mapPartitions { iter =>
-      iter.map { map =>
-        new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row
+        row: Row
       }
     }
-    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))(self))
-  }
 
+    new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema.toAttributes, rowRdd))(self))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index 172b6e0..420f21f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql
 
-import java.util.{Map => JMap, List => JList, Set => JSet}
+import java.util.{Map => JMap, List => JList}
 
 import scala.collection.JavaConversions._
 import scala.collection.JavaConverters._
@@ -32,7 +32,6 @@ import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
-import org.apache.spark.sql.catalyst.types.{DataType, ArrayType, BooleanType, StructType, MapType}
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
 import org.apache.spark.api.java.JavaRDD
 
@@ -120,6 +119,11 @@ class SchemaRDD(
   override protected def getDependencies: Seq[Dependency[_]] =
     List(new OneToOneDependency(queryExecution.toRdd))
 
+  /** Returns the schema of this SchemaRDD (represented by a [[StructType]]).
+    *
+    * @group schema
+    */
+  def schema: StructType = queryExecution.analyzed.schema
 
   // =======================================================================
   // Query DSL
@@ -376,6 +380,8 @@ class SchemaRDD(
    * Converts a JavaRDD to a PythonRDD. It is used by pyspark.
    */
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+    import scala.collection.Map
+
     def toJava(obj: Any, dataType: DataType): Any = dataType match {
       case struct: StructType => rowToMap(obj.asInstanceOf[Row], struct)
       case array: ArrayType => obj match {

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index fd75103..6a20def 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -123,9 +123,15 @@ private[sql] trait SchemaRDDLike {
   def saveAsTable(tableName: String): Unit =
     sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
 
-  /** Returns the output schema in the tree format. */
-  def schemaString: String = queryExecution.analyzed.schemaString
+  /** Returns the schema as a string in the tree format.
+   *
+   * @group schema
+   */
+  def schemaString: String = baseSchemaRDD.schema.treeString
 
-  /** Prints out the schema in the tree format. */
+  /** Prints out the schema.
+   *
+   * @group schema
+   */
   def printSchema(): Unit = println(schemaString)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 85726ba..c1c18a0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -21,14 +21,16 @@ import java.beans.Introspector
 
 import org.apache.hadoop.conf.Configuration
 
-import org.apache.spark.annotation.Experimental
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.sql.api.java.types.{StructType => JStructType}
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
-import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.parquet.ParquetRelation
 import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
+import org.apache.spark.sql.types.util.DataTypeConversions
+import DataTypeConversions.asScalaDataType;
 import org.apache.spark.util.Utils
 
 /**
@@ -96,6 +98,21 @@ class JavaSQLContext(val sqlContext: SQLContext) {
   }
 
   /**
+   * :: DeveloperApi ::
+   * Creates a JavaSchemaRDD from an RDD containing Rows by applying a schema to this RDD.
+   * It is important to make sure that the structure of every Row of the provided RDD matches the
+   * provided schema. Otherwise, there will be runtime exception.
+   */
+  @DeveloperApi
+  def applySchema(rowRDD: JavaRDD[Row], schema: JStructType): JavaSchemaRDD = {
+    val scalaRowRDD = rowRDD.rdd.map(r => r.row)
+    val scalaSchema = asScalaDataType(schema).asInstanceOf[StructType]
+    val logicalPlan =
+      SparkLogicalPlan(ExistingRdd(scalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+    new JavaSchemaRDD(sqlContext, logicalPlan)
+  }
+
+  /**
    * Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
    */
   def parquetFile(path: String): JavaSchemaRDD =
@@ -104,23 +121,49 @@ class JavaSQLContext(val sqlContext: SQLContext) {
       ParquetRelation(path, Some(sqlContext.sparkContext.hadoopConfiguration), sqlContext))
 
   /**
-   * Loads a JSON file (one object per line), returning the result as a [[JavaSchemaRDD]].
+   * Loads a JSON file (one object per line), returning the result as a JavaSchemaRDD.
    * It goes through the entire dataset once to determine the schema.
-   *
-   * @group userf
    */
   def jsonFile(path: String): JavaSchemaRDD =
     jsonRDD(sqlContext.sparkContext.textFile(path))
 
   /**
+   * :: Experimental ::
+   * Loads a JSON file (one object per line) and applies the given schema,
+   * returning the result as a JavaSchemaRDD.
+   */
+  @Experimental
+  def jsonFile(path: String, schema: JStructType): JavaSchemaRDD =
+    jsonRDD(sqlContext.sparkContext.textFile(path), schema)
+
+  /**
    * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
-   * [[JavaSchemaRDD]].
+   * JavaSchemaRDD.
    * It goes through the entire dataset once to determine the schema.
-   *
-   * @group userf
    */
-  def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD =
-    new JavaSchemaRDD(sqlContext, JsonRDD.inferSchema(sqlContext, json, 1.0))
+  def jsonRDD(json: JavaRDD[String]): JavaSchemaRDD = {
+    val appliedScalaSchema = JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))
+    val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+    val logicalPlan =
+      SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+    new JavaSchemaRDD(sqlContext, logicalPlan)
+  }
+
+  /**
+   * :: Experimental ::
+   * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+   * returning the result as a JavaSchemaRDD.
+   */
+  @Experimental
+  def jsonRDD(json: JavaRDD[String], schema: JStructType): JavaSchemaRDD = {
+    val appliedScalaSchema =
+      Option(asScalaDataType(schema)).getOrElse(
+        JsonRDD.nullTypeToStringType(JsonRDD.inferSchema(json.rdd, 1.0))).asInstanceOf[StructType]
+    val scalaRowRDD = JsonRDD.jsonStringToRow(json.rdd, appliedScalaSchema)
+    val logicalPlan =
+      SparkLogicalPlan(ExistingRdd(appliedScalaSchema.toAttributes, scalaRowRDD))(sqlContext)
+    new JavaSchemaRDD(sqlContext, logicalPlan)
+  }
 
   /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist only

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
index 8fbf13b..8245741 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala
@@ -22,8 +22,11 @@ import java.util.{List => JList}
 import org.apache.spark.Partitioner
 import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
 import org.apache.spark.api.java.function.{Function => JFunction}
+import org.apache.spark.sql.api.java.types.StructType
+import org.apache.spark.sql.types.util.DataTypeConversions
 import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import DataTypeConversions._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 
@@ -53,6 +56,10 @@ class JavaSchemaRDD(
 
   override def toString: String = baseSchemaRDD.toString
 
+  /** Returns the schema of this JavaSchemaRDD (represented by a StructType). */
+  def schema: StructType =
+    asJavaDataType(baseSchemaRDD.schema).asInstanceOf[StructType]
+
   // =======================================================================
   // Base RDD functions that do NOT change schema
   // =======================================================================

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
index 9b0dd21..6c67934 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/Row.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.api.java
 
+import scala.annotation.varargs
+import scala.collection.convert.Wrappers.{JListWrapper, JMapWrapper}
+import scala.collection.JavaConversions
+import scala.math.BigDecimal
+
 import org.apache.spark.sql.catalyst.expressions.{Row => ScalaRow}
 
 /**
@@ -29,7 +34,7 @@ class Row(private[spark] val row: ScalaRow) extends Serializable {
 
   /** Returns the value of column `i`. */
   def get(i: Int): Any =
-    row(i)
+    Row.toJavaValue(row(i))
 
   /** Returns true if value at column `i` is NULL. */
   def isNullAt(i: Int) = get(i) == null
@@ -89,5 +94,57 @@ class Row(private[spark] val row: ScalaRow) extends Serializable {
    */
   def getString(i: Int): String =
     row.getString(i)
+
+  def canEqual(other: Any): Boolean = other.isInstanceOf[Row]
+
+  override def equals(other: Any): Boolean = other match {
+    case that: Row =>
+      (that canEqual this) &&
+        row == that.row
+    case _ => false
+  }
+
+  override def hashCode(): Int = row.hashCode()
 }
 
+object Row {
+
+  private def toJavaValue(value: Any): Any = value match {
+    // For values of this ScalaRow, we will do the conversion when
+    // they are actually accessed.
+    case row: ScalaRow => new Row(row)
+    case map: scala.collection.Map[_, _] =>
+      JavaConversions.mapAsJavaMap(
+        map.map {
+          case (key, value) => (toJavaValue(key), toJavaValue(value))
+        }
+      )
+    case seq: scala.collection.Seq[_] =>
+      JavaConversions.seqAsJavaList(seq.map(toJavaValue))
+    case decimal: BigDecimal => decimal.underlying()
+    case other => other
+  }
+
+  // TODO: Consolidate the toScalaValue at here with the scalafy in JsonRDD?
+  private def toScalaValue(value: Any): Any = value match {
+    // Values of this row have been converted to Scala values.
+    case row: Row => row.row
+    case map: java.util.Map[_, _] =>
+      JMapWrapper(map).map {
+        case (key, value) => (toScalaValue(key), toScalaValue(value))
+      }
+    case list: java.util.List[_] =>
+      JListWrapper(list).map(toScalaValue)
+    case decimal: java.math.BigDecimal => BigDecimal(decimal)
+    case other => other
+  }
+
+  /**
+   * Creates a Row with the given values.
+   */
+  @varargs def create(values: Any*): Row = {
+    // Right now, we cannot use @varargs to annotate the constructor of
+    // org.apache.spark.sql.api.java.Row. See https://issues.scala-lang.org/browse/SI-8383.
+    new Row(ScalaRow(values.map(toScalaValue):_*))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
index 6c2b553..bd29ee4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala
@@ -25,33 +25,25 @@ import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.HiveTypeCoercion
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.types._
-import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan}
-import org.apache.spark.sql.{SQLContext, Logging}
+import org.apache.spark.sql.catalyst.ScalaReflection
+import org.apache.spark.sql.Logging
 
 private[sql] object JsonRDD extends Logging {
 
+  private[sql] def jsonStringToRow(
+      json: RDD[String],
+      schema: StructType): RDD[Row] = {
+    parseJson(json).map(parsed => asRow(parsed, schema))
+  }
+
   private[sql] def inferSchema(
-      sqlContext: SQLContext,
       json: RDD[String],
-      samplingRatio: Double = 1.0): LogicalPlan = {
+      samplingRatio: Double = 1.0): StructType = {
     require(samplingRatio > 0, s"samplingRatio ($samplingRatio) should be greater than 0")
     val schemaData = if (samplingRatio > 0.99) json else json.sample(false, samplingRatio, 1)
     val allKeys = parseJson(schemaData).map(allKeysWithValueTypes).reduce(_ ++ _)
-    val baseSchema = createSchema(allKeys)
-
-    createLogicalPlan(json, baseSchema, sqlContext)
-  }
-
-  private def createLogicalPlan(
-      json: RDD[String],
-      baseSchema: StructType,
-      sqlContext: SQLContext): LogicalPlan = {
-    val schema = nullTypeToStringType(baseSchema)
-
-    SparkLogicalPlan(
-      ExistingRdd(asAttributes(schema), parseJson(json).map(asRow(_, schema))))(sqlContext)
+    createSchema(allKeys)
   }
 
   private def createSchema(allKeys: Set[(String, DataType)]): StructType = {
@@ -75,8 +67,8 @@ private[sql] object JsonRDD extends Logging {
       val (topLevel, structLike) = values.partition(_.size == 1)
       val topLevelFields = topLevel.filter {
         name => resolved.get(prefix ++ name).get match {
-          case ArrayType(StructType(Nil)) => false
-          case ArrayType(_) => true
+          case ArrayType(StructType(Nil), _) => false
+          case ArrayType(_, _) => true
           case struct: StructType => false
           case _ => true
         }
@@ -90,7 +82,8 @@ private[sql] object JsonRDD extends Logging {
           val structType = makeStruct(nestedFields, prefix :+ name)
           val dataType = resolved.get(prefix :+ name).get
           dataType match {
-            case array: ArrayType => Some(StructField(name, ArrayType(structType), nullable = true))
+            case array: ArrayType =>
+              Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true))
             case struct: StructType => Some(StructField(name, structType, nullable = true))
             // dataType is StringType means that we have resolved type conflicts involving
             // primitive types and complex types. So, the type of name has been relaxed to
@@ -109,6 +102,22 @@ private[sql] object JsonRDD extends Logging {
     makeStruct(resolved.keySet.toSeq, Nil)
   }
 
+  private[sql] def nullTypeToStringType(struct: StructType): StructType = {
+    val fields = struct.fields.map {
+      case StructField(fieldName, dataType, nullable) => {
+        val newType = dataType match {
+          case NullType => StringType
+          case ArrayType(NullType, containsNull) => ArrayType(StringType, containsNull)
+          case struct: StructType => nullTypeToStringType(struct)
+          case other: DataType => other
+        }
+        StructField(fieldName, newType, nullable)
+      }
+    }
+
+    StructType(fields)
+  }
+
   /**
    * Returns the most general data type for two given data types.
    */
@@ -139,8 +148,8 @@ private[sql] object JsonRDD extends Logging {
             case StructField(name, _, _) => name
           })
         }
-        case (ArrayType(elementType1), ArrayType(elementType2)) =>
-          ArrayType(compatibleType(elementType1, elementType2))
+        case (ArrayType(elementType1, containsNull1), ArrayType(elementType2, containsNull2)) =>
+          ArrayType(compatibleType(elementType1, elementType2), containsNull1 || containsNull2)
         // TODO: We should use JsonObjectStringType to mark that values of field will be
         // strings and every string is a Json object.
         case (_, _) => StringType
@@ -148,18 +157,13 @@ private[sql] object JsonRDD extends Logging {
     }
   }
 
-  private def typeOfPrimitiveValue(value: Any): DataType = {
-    value match {
-      case value: java.lang.String => StringType
-      case value: java.lang.Integer => IntegerType
-      case value: java.lang.Long => LongType
+  private def typeOfPrimitiveValue: PartialFunction[Any, DataType] = {
+    ScalaReflection.typeOfObject orElse {
       // Since we do not have a data type backed by BigInteger,
       // when we see a Java BigInteger, we use DecimalType.
       case value: java.math.BigInteger => DecimalType
-      case value: java.lang.Double => DoubleType
+      // DecimalType's JVMType is scala BigDecimal.
       case value: java.math.BigDecimal => DecimalType
-      case value: java.lang.Boolean => BooleanType
-      case null => NullType
       // Unexpected data type.
       case _ => StringType
     }
@@ -172,12 +176,13 @@ private[sql] object JsonRDD extends Logging {
    * treat the element as String.
    */
   private def typeOfArray(l: Seq[Any]): ArrayType = {
+    val containsNull = l.exists(v => v == null)
     val elements = l.flatMap(v => Option(v))
     if (elements.isEmpty) {
       // If this JSON array is empty, we use NullType as a placeholder.
       // If this array is not empty in other JSON objects, we can resolve
       // the type after we have passed through all JSON objects.
-      ArrayType(NullType)
+      ArrayType(NullType, containsNull)
     } else {
       val elementType = elements.map {
         e => e match {
@@ -189,7 +194,7 @@ private[sql] object JsonRDD extends Logging {
         }
       }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2))
 
-      ArrayType(elementType)
+      ArrayType(elementType, containsNull)
     }
   }
 
@@ -216,15 +221,16 @@ private[sql] object JsonRDD extends Logging {
       case (key: String, array: Seq[_]) => {
         // The value associated with the key is an array.
         typeOfArray(array) match {
-          case ArrayType(StructType(Nil)) => {
+          case ArrayType(StructType(Nil), containsNull) => {
             // The elements of this arrays are structs.
             array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
               element => allKeysWithValueTypes(element)
             }.map {
               case (k, dataType) => (s"$key.$k", dataType)
-            } :+ (key, ArrayType(StructType(Nil)))
+            } :+ (key, ArrayType(StructType(Nil), containsNull))
           }
-          case ArrayType(elementType) => (key, ArrayType(elementType)) :: Nil
+          case ArrayType(elementType, containsNull) =>
+            (key, ArrayType(elementType, containsNull)) :: Nil
         }
       }
       case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
@@ -262,8 +268,11 @@ private[sql] object JsonRDD extends Logging {
       // the ObjectMapper will take the last value associated with this duplicate key.
       // For example: for {"key": 1, "key":2}, we will get "key"->2.
       val mapper = new ObjectMapper()
-      iter.map(record => mapper.readValue(record, classOf[java.util.Map[String, Any]]))
-      }).map(scalafy).map(_.asInstanceOf[Map[String, Any]])
+      iter.map { record =>
+        val parsed = scalafy(mapper.readValue(record, classOf[java.util.Map[String, Any]]))
+        parsed.asInstanceOf[Map[String, Any]]
+      }
+    })
   }
 
   private def toLong(value: Any): Long = {
@@ -334,7 +343,7 @@ private[sql] object JsonRDD extends Logging {
       null
     } else {
       desiredType match {
-        case ArrayType(elementType) =>
+        case ArrayType(elementType, _) =>
           value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
         case StringType => toString(value)
         case IntegerType => value.asInstanceOf[IntegerType.JvmType]
@@ -348,6 +357,7 @@ private[sql] object JsonRDD extends Logging {
   }
 
   private def asRow(json: Map[String,Any], schema: StructType): Row = {
+    // TODO: Reuse the row instead of creating a new one for every record.
     val row = new GenericMutableRow(schema.fields.length)
     schema.fields.zipWithIndex.foreach {
       // StructType
@@ -356,7 +366,7 @@ private[sql] object JsonRDD extends Logging {
           v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
 
       // ArrayType(StructType)
-      case (StructField(name, ArrayType(structType: StructType), _), i) =>
+      case (StructField(name, ArrayType(structType: StructType, _), _), i) =>
         row.update(i,
           json.get(name).flatMap(v => Option(v)).map(
             v => v.asInstanceOf[Seq[Any]].map(
@@ -370,32 +380,4 @@ private[sql] object JsonRDD extends Logging {
 
     row
   }
-
-  private def nullTypeToStringType(struct: StructType): StructType = {
-    val fields = struct.fields.map {
-      case StructField(fieldName, dataType, nullable) => {
-        val newType = dataType match {
-          case NullType => StringType
-          case ArrayType(NullType) => ArrayType(StringType)
-          case struct: StructType => nullTypeToStringType(struct)
-          case other: DataType => other
-        }
-        StructField(fieldName, newType, nullable)
-      }
-    }
-
-    StructType(fields)
-  }
-
-  private def asAttributes(struct: StructType): Seq[AttributeReference] = {
-    struct.fields.map(f => AttributeReference(f.name, f.dataType, nullable = true)())
-  }
-
-  private def asStruct(attributes: Seq[AttributeReference]): StructType = {
-    val fields = attributes.map {
-      case AttributeReference(name, dataType, nullable) => StructField(name, dataType, nullable)
-    }
-
-    StructType(fields)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package-info.java b/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
new file mode 100644
index 0000000..5360361
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package-info.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ */
+package org.apache.spark.sql;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/package.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
new file mode 100644
index 0000000..0995a4e
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * Allows the execution of relational queries, including those expressed in SQL using Spark.
+ *
+ *  @groupname dataType Data types
+ *  @groupdesc Spark SQL data types.
+ *  @groupprio dataType -3
+ *  @groupname field Field
+ *  @groupprio field -2
+ *  @groupname row Row
+ *  @groupprio row -1
+ */
+package object sql {
+
+  protected[sql] type Logging = com.typesafe.scalalogging.slf4j.Logging
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * Represents one row of output from a relational operator.
+   * @group row
+   */
+  @DeveloperApi
+  type Row = catalyst.expressions.Row
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * A [[Row]] object can be constructed by providing field values. Example:
+   * {{{
+   * import org.apache.spark.sql._
+   *
+   * // Create a Row from values.
+   * Row(value1, value2, value3, ...)
+   * // Create a Row from a Seq of values.
+   * Row.fromSeq(Seq(value1, value2, ...))
+   * }}}
+   *
+   * A value of a row can be accessed through both generic access by ordinal,
+   * which will incur boxing overhead for primitives, as well as native primitive access.
+   * An example of generic access by ordinal:
+   * {{{
+   * import org.apache.spark.sql._
+   *
+   * val row = Row(1, true, "a string", null)
+   * // row: Row = [1,true,a string,null]
+   * val firstValue = row(0)
+   * // firstValue: Any = 1
+   * val fourthValue = row(3)
+   * // fourthValue: Any = null
+   * }}}
+   *
+   * For native primitive access, it is invalid to use the native primitive interface to retrieve
+   * a value that is null, instead a user must check `isNullAt` before attempting to retrieve a
+   * value that might be null.
+   * An example of native primitive access:
+   * {{{
+   * // using the row from the previous example.
+   * val firstValue = row.getInt(0)
+   * // firstValue: Int = 1
+   * val isNull = row.isNullAt(3)
+   * // isNull: Boolean = true
+   * }}}
+   *
+   * Interfaces related to native primitive access are:
+   *
+   * `isNullAt(i: Int): Boolean`
+   *
+   * `getInt(i: Int): Int`
+   *
+   * `getLong(i: Int): Long`
+   *
+   * `getDouble(i: Int): Double`
+   *
+   * `getFloat(i: Int): Float`
+   *
+   * `getBoolean(i: Int): Boolean`
+   *
+   * `getShort(i: Int): Short`
+   *
+   * `getByte(i: Int): Byte`
+   *
+   * `getString(i: Int): String`
+   *
+   * Fields in a [[Row]] object can be extracted in a pattern match. Example:
+   * {{{
+   * import org.apache.spark.sql._
+   *
+   * val pairs = sql("SELECT key, value FROM src").rdd.map {
+   *   case Row(key: Int, value: String) =>
+   *     key -> value
+   * }
+   * }}}
+   *
+   * @group row
+   */
+  @DeveloperApi
+  val Row = catalyst.expressions.Row
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The base type of all Spark SQL data types.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  type DataType = catalyst.types.DataType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `String` values
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val StringType = catalyst.types.StringType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Array[Byte]` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val BinaryType = catalyst.types.BinaryType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Boolean` values.
+   *
+   *@group dataType
+   */
+  @DeveloperApi
+  val BooleanType = catalyst.types.BooleanType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `java.sql.Timestamp` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val TimestampType = catalyst.types.TimestampType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `scala.math.BigDecimal` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val DecimalType = catalyst.types.DecimalType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Double` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val DoubleType = catalyst.types.DoubleType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Float` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val FloatType = catalyst.types.FloatType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Byte` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val ByteType = catalyst.types.ByteType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Int` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val IntegerType = catalyst.types.IntegerType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Long` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val LongType = catalyst.types.LongType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Short` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val ShortType = catalyst.types.ShortType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type for collections of multiple values.
+   * Internally these are represented as columns that contain a ``scala.collection.Seq``.
+   *
+   * An [[ArrayType]] object comprises two fields, `elementType: [[DataType]]` and
+   * `containsNull: Boolean`. The field of `elementType` is used to specify the type of
+   * array elements. The field of `containsNull` is used to specify if the array has `null` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  type ArrayType = catalyst.types.ArrayType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * An [[ArrayType]] object can be constructed with two ways,
+   * {{{
+   * ArrayType(elementType: DataType, containsNull: Boolean)
+   * }}} and
+   * {{{
+   * ArrayType(elementType: DataType)
+   * }}}
+   * For `ArrayType(elementType)`, the field of `containsNull` is set to `false`.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val ArrayType = catalyst.types.ArrayType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing `Map`s. A [[MapType]] object comprises three fields,
+   * `keyType: [[DataType]]`, `valueType: [[DataType]]` and `valueContainsNull: Boolean`.
+   * The field of `keyType` is used to specify the type of keys in the map.
+   * The field of `valueType` is used to specify the type of values in the map.
+   * The field of `valueContainsNull` is used to specify if values of this map has `null` values.
+   * For values of a MapType column, keys are not allowed to have `null` values.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  type MapType = catalyst.types.MapType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * A [[MapType]] object can be constructed with two ways,
+   * {{{
+   * MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean)
+   * }}} and
+   * {{{
+   * MapType(keyType: DataType, valueType: DataType)
+   * }}}
+   * For `MapType(keyType: DataType, valueType: DataType)`,
+   * the field of `valueContainsNull` is set to `true`.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val MapType = catalyst.types.MapType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * The data type representing [[Row]]s.
+   * A [[StructType]] object comprises a [[Seq]] of [[StructField]]s.
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  type StructType = catalyst.types.StructType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * A [[StructType]] object can be constructed by
+   * {{{
+   * StructType(fields: Seq[StructField])
+   * }}}
+   * For a [[StructType]] object, one or multiple [[StructField]]s can be extracted by names.
+   * If multiple [[StructField]]s are extracted, a [[StructType]] object will be returned.
+   * If a provided name does not have a matching field, it will be ignored. For the case
+   * of extracting a single StructField, a `null` will be returned.
+   * Example:
+   * {{{
+   * import org.apache.spark.sql._
+   *
+   * val struct =
+   *   StructType(
+   *     StructField("a", IntegerType, true) ::
+   *     StructField("b", LongType, false) ::
+   *     StructField("c", BooleanType, false) :: Nil)
+   *
+   * // Extract a single StructField.
+   * val singleField = struct("b")
+   * // singleField: StructField = StructField(b,LongType,false)
+   *
+   * // This struct does not have a field called "d". null will be returned.
+   * val nonExisting = struct("d")
+   * // nonExisting: StructField = null
+   *
+   * // Extract multiple StructFields. Field names are provided in a set.
+   * // A StructType object will be returned.
+   * val twoFields = struct(Set("b", "c"))
+   * // twoFields: StructType =
+   * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+   *
+   * // Those names do not have matching fields will be ignored.
+   * // For the case shown below, "d" will be ignored and
+   * // it is treated as struct(Set("b", "c")).
+   * val ignoreNonExisting = struct(Set("b", "c", "d"))
+   * // ignoreNonExisting: StructType =
+   * //   StructType(List(StructField(b,LongType,false), StructField(c,BooleanType,false)))
+   * }}}
+   *
+   * A [[Row]] object is used as a value of the StructType.
+   * Example:
+   * {{{
+   * import org.apache.spark.sql._
+   *
+   * val innerStruct =
+   *   StructType(
+   *     StructField("f1", IntegerType, true) ::
+   *     StructField("f2", LongType, false) ::
+   *     StructField("f3", BooleanType, false) :: Nil)
+   *
+   * val struct = StructType(
+   *   StructField("a", innerStruct, true) :: Nil)
+   *
+   * // Create a Row with the schema defined by struct
+   * val row = Row(Row(1, 2, true))
+   * // row: Row = [[1,2,true]]
+   * }}}
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val StructType = catalyst.types.StructType
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * A [[StructField]] object represents a field in a [[StructType]] object.
+   * A [[StructField]] object comprises three fields, `name: [[String]]`, `dataType: [[DataType]]`,
+   * and `nullable: Boolean`. The field of `name` is the name of a `StructField`. The field of
+   * `dataType` specifies the data type of a `StructField`.
+   * The field of `nullable` specifies if values of a `StructField` can contain `null` values.
+   *
+   * @group field
+   */
+  @DeveloperApi
+  type StructField = catalyst.types.StructField
+
+  /**
+   * :: DeveloperApi ::
+   *
+   * A [[StructField]] object can be constructed by
+   * {{{
+   * StructField(name: String, dataType: DataType, nullable: Boolean)
+   * }}}
+   *
+   * @group dataType
+   */
+  @DeveloperApi
+  val StructField = catalyst.types.StructField
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index de8fe2d..0a3b59c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -75,21 +75,21 @@ private[sql] object CatalystConverter {
     val fieldType: DataType = field.dataType
     fieldType match {
       // For native JVM types we use a converter with native arrays
-      case ArrayType(elementType: NativeType) => {
+      case ArrayType(elementType: NativeType, false) => {
         new CatalystNativeArrayConverter(elementType, fieldIndex, parent)
       }
       // This is for other types of arrays, including those with nested fields
-      case ArrayType(elementType: DataType) => {
+      case ArrayType(elementType: DataType, false) => {
         new CatalystArrayConverter(elementType, fieldIndex, parent)
       }
       case StructType(fields: Seq[StructField]) => {
         new CatalystStructConverter(fields.toArray, fieldIndex, parent)
       }
-      case MapType(keyType: DataType, valueType: DataType) => {
+      case MapType(keyType: DataType, valueType: DataType, valueContainsNull: Boolean) => {
         new CatalystMapConverter(
           Array(
             new FieldType(MAP_KEY_SCHEMA_NAME, keyType, false),
-            new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, true)),
+            new FieldType(MAP_VALUE_SCHEMA_NAME, valueType, valueContainsNull)),
           fieldIndex,
           parent)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 39294a3..6d4ce32 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -172,10 +172,10 @@ private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
   private[parquet] def writeValue(schema: DataType, value: Any): Unit = {
     if (value != null) {
       schema match {
-        case t @ ArrayType(_) => writeArray(
+        case t @ ArrayType(_, false) => writeArray(
           t,
           value.asInstanceOf[CatalystConverter.ArrayScalaType[_]])
-        case t @ MapType(_, _) => writeMap(
+        case t @ MapType(_, _, _) => writeMap(
           t,
           value.asInstanceOf[CatalystConverter.MapScalaType[_, _]])
         case t @ StructType(_) => writeStruct(

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index 58370b9..aaef1a1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -116,7 +116,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         case ParquetOriginalType.LIST => { // TODO: check enums!
           assert(groupType.getFieldCount == 1)
           val field = groupType.getFields.apply(0)
-          new ArrayType(toDataType(field))
+          ArrayType(toDataType(field), containsNull = false)
         }
         case ParquetOriginalType.MAP => {
           assert(
@@ -130,7 +130,9 @@ private[parquet] object ParquetTypesConverter extends Logging {
           assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
           val valueType = toDataType(keyValueGroup.getFields.apply(1))
           assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
-          new MapType(keyType, valueType)
+          // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
+          // at here.
+          MapType(keyType, valueType)
         }
         case _ => {
           // Note: the order of these checks is important!
@@ -140,10 +142,12 @@ private[parquet] object ParquetTypesConverter extends Logging {
             assert(keyValueGroup.getFields.apply(0).getRepetition == Repetition.REQUIRED)
             val valueType = toDataType(keyValueGroup.getFields.apply(1))
             assert(keyValueGroup.getFields.apply(1).getRepetition == Repetition.REQUIRED)
-            new MapType(keyType, valueType)
+            // TODO: set valueContainsNull explicitly instead of assuming valueContainsNull is true
+            // at here.
+            MapType(keyType, valueType)
           } else if (correspondsToArray(groupType)) { // ArrayType
             val elementType = toDataType(groupType.getFields.apply(0))
-            new ArrayType(elementType)
+            ArrayType(elementType, containsNull = false)
           } else { // everything else: StructType
             val fields = groupType
               .getFields
@@ -151,7 +155,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
               ptype.getName,
               toDataType(ptype),
               ptype.getRepetition != Repetition.REQUIRED))
-            new StructType(fields)
+            StructType(fields)
           }
         }
       }
@@ -234,7 +238,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
         new ParquetPrimitiveType(repetition, primitiveType, name, originalType.orNull)
     }.getOrElse {
       ctype match {
-        case ArrayType(elementType) => {
+        case ArrayType(elementType, false) => {
           val parquetElementType = fromDataType(
             elementType,
             CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
@@ -248,7 +252,7 @@ private[parquet] object ParquetTypesConverter extends Logging {
           }
           new ParquetGroupType(repetition, name, fields)
         }
-        case MapType(keyType, valueType) => {
+        case MapType(keyType, valueType, _) => {
           val parquetKeyType =
             fromDataType(
               keyType,

http://git-wip-us.apache.org/repos/asf/spark/blob/7003c163/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
new file mode 100644
index 0000000..d1aa3c8
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/types/util/DataTypeConversions.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.types.util
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.api.java.types.{DataType => JDataType, StructField => JStructField}
+
+import scala.collection.JavaConverters._
+
+protected[sql] object DataTypeConversions {
+
+  /**
+   * Returns the equivalent StructField in Scala for the given StructField in Java.
+   */
+  def asJavaStructField(scalaStructField: StructField): JStructField = {
+    JDataType.createStructField(
+      scalaStructField.name,
+      asJavaDataType(scalaStructField.dataType),
+      scalaStructField.nullable)
+  }
+
+  /**
+   * Returns the equivalent DataType in Java for the given DataType in Scala.
+   */
+  def asJavaDataType(scalaDataType: DataType): JDataType = scalaDataType match {
+    case StringType => JDataType.StringType
+    case BinaryType => JDataType.BinaryType
+    case BooleanType => JDataType.BooleanType
+    case TimestampType => JDataType.TimestampType
+    case DecimalType => JDataType.DecimalType
+    case DoubleType => JDataType.DoubleType
+    case FloatType => JDataType.FloatType
+    case ByteType => JDataType.ByteType
+    case IntegerType => JDataType.IntegerType
+    case LongType => JDataType.LongType
+    case ShortType => JDataType.ShortType
+
+    case arrayType: ArrayType => JDataType.createArrayType(
+        asJavaDataType(arrayType.elementType), arrayType.containsNull)
+    case mapType: MapType => JDataType.createMapType(
+        asJavaDataType(mapType.keyType),
+        asJavaDataType(mapType.valueType),
+        mapType.valueContainsNull)
+    case structType: StructType => JDataType.createStructType(
+        structType.fields.map(asJavaStructField).asJava)
+  }
+
+  /**
+   * Returns the equivalent StructField in Scala for the given StructField in Java.
+   */
+  def asScalaStructField(javaStructField: JStructField): StructField = {
+    StructField(
+      javaStructField.getName,
+      asScalaDataType(javaStructField.getDataType),
+      javaStructField.isNullable)
+  }
+
+  /**
+   * Returns the equivalent DataType in Scala for the given DataType in Java.
+   */
+  def asScalaDataType(javaDataType: JDataType): DataType = javaDataType match {
+    case stringType: org.apache.spark.sql.api.java.types.StringType =>
+      StringType
+    case binaryType: org.apache.spark.sql.api.java.types.BinaryType =>
+      BinaryType
+    case booleanType: org.apache.spark.sql.api.java.types.BooleanType =>
+      BooleanType
+    case timestampType: org.apache.spark.sql.api.java.types.TimestampType =>
+      TimestampType
+    case decimalType: org.apache.spark.sql.api.java.types.DecimalType =>
+      DecimalType
+    case doubleType: org.apache.spark.sql.api.java.types.DoubleType =>
+      DoubleType
+    case floatType: org.apache.spark.sql.api.java.types.FloatType =>
+      FloatType
+    case byteType: org.apache.spark.sql.api.java.types.ByteType =>
+      ByteType
+    case integerType: org.apache.spark.sql.api.java.types.IntegerType =>
+      IntegerType
+    case longType: org.apache.spark.sql.api.java.types.LongType =>
+      LongType
+    case shortType: org.apache.spark.sql.api.java.types.ShortType =>
+      ShortType
+
+    case arrayType: org.apache.spark.sql.api.java.types.ArrayType =>
+      ArrayType(asScalaDataType(arrayType.getElementType), arrayType.isContainsNull)
+    case mapType: org.apache.spark.sql.api.java.types.MapType =>
+      MapType(
+        asScalaDataType(mapType.getKeyType),
+        asScalaDataType(mapType.getValueType),
+        mapType.isValueContainsNull)
+    case structType: org.apache.spark.sql.api.java.types.StructType =>
+      StructType(structType.getFields.map(asScalaStructField))
+  }
+}