You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/05/16 07:00:35 UTC

[2/2] spark git commit: [SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API

[SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API

This patch introduces DataFrameWriter and DataFrameReader.

DataFrameReader interface, accessible through SQLContext.read, contains methods that create DataFrames. These methods used to reside in SQLContext. Example usage:
```scala
sqlContext.read.json("...")
sqlContext.read.parquet("...")
```

DataFrameWriter interface, accessible through DataFrame.write, implements a builder pattern to avoid the proliferation of options in writing DataFrame out. It currently implements:
- mode
- format (e.g. "parquet", "json")
- options (generic options passed down into data sources)
- partitionBy (partitioning columns)
Example usage:
```scala
df.write.mode("append").format("json").partitionBy("date").saveAsTable("myJsonTable")
```

TODO:

- [ ] Documentation update
- [ ] Move JDBC into reader / writer?
- [ ] Deprecate the old interfaces
- [ ] Move the generic load interface into reader.
- [ ] Update example code and documentation

Author: Reynold Xin <rx...@databricks.com>

Closes #6175 from rxin/reader-writer and squashes the following commits:

b146c95 [Reynold Xin] Deprecation of old APIs.
bd8abdf [Reynold Xin] Fixed merge conflict.
26abea2 [Reynold Xin] Added general load methods.
244fbec [Reynold Xin] Added equivalent to example.
4f15d92 [Reynold Xin] Added documentation for partitionBy.
7e91611 [Reynold Xin] [SPARK-7654][SQL] DataFrameReader and DataFrameWriter for input/output API.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/578bfeef
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/578bfeef
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/578bfeef

Branch: refs/heads/master
Commit: 578bfeeff514228f6fd4b07a536815fbb3510f7e
Parents: deb4113
Author: Reynold Xin <rx...@databricks.com>
Authored: Fri May 15 22:00:31 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri May 15 22:00:31 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/examples/sql/JavaSparkSQL.java |   4 +-
 .../spark/examples/mllib/DatasetExample.scala   |   2 +-
 .../apache/spark/examples/sql/RDDRelation.scala |   2 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  | 172 +++-------
 .../org/apache/spark/sql/DataFrameReader.scala  | 218 +++++++++++++
 .../org/apache/spark/sql/DataFrameWriter.scala  | 198 ++++++++++++
 .../scala/org/apache/spark/sql/SQLContext.scala | 158 +++------
 .../apache/spark/sql/parquet/ParquetTest.scala  |   8 +-
 .../spark/sql/sources/JavaSaveLoadSuite.java    |   8 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   |   4 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  17 +-
 .../apache/spark/sql/UserDefinedTypeSuite.scala |   4 +-
 .../org/apache/spark/sql/json/JsonSuite.scala   |  50 +--
 .../spark/sql/parquet/ParquetFilterSuite.scala  |   6 +-
 .../spark/sql/parquet/ParquetIOSuite.scala      |  41 ++-
 .../ParquetPartitionDiscoverySuite.scala        |  16 +-
 .../sql/sources/CreateTableAsSelectSuite.scala  |   2 +-
 .../apache/spark/sql/sources/InsertSuite.scala  |  10 +-
 .../spark/sql/sources/SaveLoadSuite.scala       |  26 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |   4 +-
 .../spark/sql/hive/HiveParquetSuite.scala       |   8 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  18 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  16 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 321 +++++++++----------
 24 files changed, 772 insertions(+), 541 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 8159ffb..173633c 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -99,7 +99,7 @@ public class JavaSparkSQL {
     // Read in the parquet file created above.
     // Parquet files are self-describing so the schema is preserved.
     // The result of loading a parquet file is also a DataFrame.
-    DataFrame parquetFile = sqlContext.parquetFile("people.parquet");
+    DataFrame parquetFile = sqlContext.read().parquet("people.parquet");
 
     //Parquet files can also be registered as tables and then used in SQL statements.
     parquetFile.registerTempTable("parquetFile");
@@ -120,7 +120,7 @@ public class JavaSparkSQL {
     // The path can be either a single text file or a directory storing text files.
     String path = "examples/src/main/resources/people.json";
     // Create a DataFrame from the file(s) pointed by path
-    DataFrame peopleFromJsonFile = sqlContext.jsonFile(path);
+    DataFrame peopleFromJsonFile = sqlContext.read().json(path);
 
     // Because the schema of a JSON dataset is automatically inferred, to write queries,
     // it is better to take a look at what is the schema.

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
index e943d6c..c95cca7 100644
--- a/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala
@@ -106,7 +106,7 @@ object DatasetExample {
     df.saveAsParquetFile(outputDir)
 
     println(s"Loading Parquet file with UDT from $outputDir.")
-    val newDataset = sqlContext.parquetFile(outputDir)
+    val newDataset = sqlContext.read.parquet(outputDir)
 
     println(s"Schema from Parquet: ${newDataset.schema.prettyJson}")
     val newFeatures = newDataset.select("features").map { case Row(v: Vector) => v }

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
index 6331d1c..acc8919 100644
--- a/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala
@@ -61,7 +61,7 @@ object RDDRelation {
     df.saveAsParquetFile("pair.parquet")
 
     // Read in parquet file.  Parquet files are self-describing so the schmema is preserved.
-    val parquetFile = sqlContext.parquetFile("pair.parquet")
+    val parquetFile = sqlContext.read.parquet("pair.parquet")
 
     // Queries can be run using the DSL on parequet files just like the original RDD.
     parquetFile.where($"key" === 1).select($"value".as("a")).collect().foreach(println)

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 2e20c3d..55ef357 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1290,22 +1290,32 @@ class DataFrame private[sql](
   }
 
   /**
+   * :: Experimental ::
+   * Interface for saving the content of the [[DataFrame]] out into external storage.
+   *
+   * @group output
+   * @since 1.4.0
+   */
+  @Experimental
+  def write: DataFrameWriter = new DataFrameWriter(this)
+
+  /**
    * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
    * Files that are written out using this method can be read back in as a [[DataFrame]]
    * using the `parquetFile` function in [[SQLContext]].
    * @group output
    * @since 1.3.0
    */
+  @deprecated("Use write.parquet(path)", "1.4.0")
   def saveAsParquetFile(path: String): Unit = {
     if (sqlContext.conf.parquetUseDataSourceApi) {
-      save("org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> path))
+      write.format("parquet").mode(SaveMode.ErrorIfExists).save(path)
     } else {
       sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
     }
   }
 
   /**
-   * :: Experimental ::
    * Creates a table from the the contents of this DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.
    * This will fail if the table already exists.
@@ -1320,13 +1330,12 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String): Unit = {
-    saveAsTable(tableName, SaveMode.ErrorIfExists)
+    write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)
   }
 
   /**
-   * :: Experimental ::
    * Creates a table from the the contents of this DataFrame, using the default data source
    * configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
    *
@@ -1340,20 +1349,18 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, mode: SaveMode): Unit = {
     if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
       // If table already exists and the save mode is Append,
       // we will just call insertInto to append the contents of this DataFrame.
       insertInto(tableName, overwrite = false)
     } else {
-      val dataSourceName = sqlContext.conf.defaultDataSourceName
-      saveAsTable(tableName, dataSourceName, mode)
+      write.mode(mode).saveAsTable(tableName)
     }
   }
 
   /**
-   * :: Experimental ::
    * Creates a table at the given path from the the contents of this DataFrame
    * based on a given data source and a set of options,
    * using [[SaveMode.ErrorIfExists]] as the save mode.
@@ -1368,9 +1375,9 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, source: String): Unit = {
-    saveAsTable(tableName, source, SaveMode.ErrorIfExists)
+    write.format(source).saveAsTable(tableName)
   }
 
   /**
@@ -1388,13 +1395,12 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
-    saveAsTable(tableName, source, mode, Map.empty[String, String])
+    write.format(source).mode(mode).saveAsTable(tableName)
   }
 
   /**
-   * :: Experimental ::
    * Creates a table at the given path from the the contents of this DataFrame
    * based on a given data source, [[SaveMode]] specified by mode, and a set of options.
    *
@@ -1408,40 +1414,17 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
+    "1.4.0")
   def saveAsTable(
       tableName: String,
       source: String,
       mode: SaveMode,
       options: java.util.Map[String, String]): Unit = {
-    saveAsTable(tableName, source, mode, options.toMap)
-  }
-
-  /**
-   * :: Experimental ::
-   * Creates a table at the given path from the the contents of this DataFrame
-   * based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
-   * partition columns.
-   *
-   * Note that this currently only works with DataFrames that are created from a HiveContext as
-   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
-   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
-   * be the target of an `insertInto`.
-   * @group output
-   * @since 1.4.0
-   */
-  @Experimental
-  def saveAsTable(
-      tableName: String,
-      source: String,
-      mode: SaveMode,
-      options: java.util.Map[String, String],
-      partitionColumns: java.util.List[String]): Unit = {
-    saveAsTable(tableName, source, mode, options.toMap, partitionColumns)
+    write.format(source).mode(mode).options(options).saveAsTable(tableName)
   }
 
   /**
-   * :: Experimental ::
    * (Scala-specific)
    * Creates a table from the the contents of this DataFrame based on a given data source,
    * [[SaveMode]] specified by mode, and a set of options.
@@ -1456,167 +1439,88 @@ class DataFrame private[sql](
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
+    "1.4.0")
   def saveAsTable(
       tableName: String,
       source: String,
       mode: SaveMode,
       options: Map[String, String]): Unit = {
-    val cmd =
-      CreateTableUsingAsSelect(
-        tableName,
-        source,
-        temporary = false,
-        Array.empty[String],
-        mode,
-        options,
-        logicalPlan)
-
-    sqlContext.executePlan(cmd).toRdd
+    write.format(source).mode(mode).options(options).saveAsTable(tableName)
   }
 
   /**
-   * :: Experimental ::
-   * Creates a table at the given path from the the contents of this DataFrame
-   * based on a given data source, [[SaveMode]] specified by mode, a set of options, and a list of
-   * partition columns.
-   *
-   * Note that this currently only works with DataFrames that are created from a HiveContext as
-   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
-   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
-   * be the target of an `insertInto`.
-   * @group output
-   * @since 1.4.0
-   */
-  @Experimental
-  def saveAsTable(
-      tableName: String,
-      source: String,
-      mode: SaveMode,
-      options: Map[String, String],
-      partitionColumns: Seq[String]): Unit = {
-    sqlContext.executePlan(
-      CreateTableUsingAsSelect(
-        tableName,
-        source,
-        temporary = false,
-        partitionColumns.toArray,
-        mode,
-        options,
-        logicalPlan)).toRdd
-  }
-
-  /**
-   * :: Experimental ::
    * Saves the contents of this DataFrame to the given path,
    * using the default data source configured by spark.sql.sources.default and
    * [[SaveMode.ErrorIfExists]] as the save mode.
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.save(path)", "1.4.0")
   def save(path: String): Unit = {
-    save(path, SaveMode.ErrorIfExists)
+    write.save(path)
   }
 
   /**
-   * :: Experimental ::
    * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
    * using the default data source configured by spark.sql.sources.default.
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.mode(mode).save(path)", "1.4.0")
   def save(path: String, mode: SaveMode): Unit = {
-    val dataSourceName = sqlContext.conf.defaultDataSourceName
-    save(path, dataSourceName, mode)
+    write.mode(mode).save(path)
   }
 
   /**
-   * :: Experimental ::
    * Saves the contents of this DataFrame to the given path based on the given data source,
    * using [[SaveMode.ErrorIfExists]] as the save mode.
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).save(path)", "1.4.0")
   def save(path: String, source: String): Unit = {
-    save(source, SaveMode.ErrorIfExists, Map("path" -> path))
+    write.format(source).save(path)
   }
 
   /**
-   * :: Experimental ::
    * Saves the contents of this DataFrame to the given path based on the given data source and
    * [[SaveMode]] specified by mode.
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
   def save(path: String, source: String, mode: SaveMode): Unit = {
-    save(source, mode, Map("path" -> path))
+    write.format(source).mode(mode).save(path)
   }
 
   /**
-   * :: Experimental ::
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options.
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
   def save(
       source: String,
       mode: SaveMode,
       options: java.util.Map[String, String]): Unit = {
-    save(source, mode, options.toMap)
+    write.format(source).mode(mode).options(options).save()
   }
 
   /**
-   * :: Experimental ::
-   * Saves the contents of this DataFrame to the given path based on the given data source,
-   * [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
-   * @group output
-   * @since 1.4.0
-   */
-  @Experimental
-  def save(
-      source: String,
-      mode: SaveMode,
-      options: java.util.Map[String, String],
-      partitionColumns: java.util.List[String]): Unit = {
-    save(source, mode, options.toMap, partitionColumns)
-  }
-
-  /**
-   * :: Experimental ::
    * (Scala-specific)
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options
    * @group output
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
   def save(
       source: String,
       mode: SaveMode,
       options: Map[String, String]): Unit = {
-    ResolvedDataSource(sqlContext, source, Array.empty[String], mode, options, this)
-  }
-
-  /**
-   * :: Experimental ::
-   * Saves the contents of this DataFrame to the given path based on the given data source,
-   * [[SaveMode]] specified by mode, and partition columns specified by `partitionColumns`.
-   * @group output
-   * @since 1.4.0
-   */
-  @Experimental
-  def save(
-      source: String,
-      mode: SaveMode,
-      options: Map[String, String],
-      partitionColumns: Seq[String]): Unit = {
-    ResolvedDataSource(sqlContext, source, partitionColumns.toArray, mode, options, this)
+    write.format(source).mode(mode).options(options).save()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
new file mode 100644
index 0000000..4d63faa
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -0,0 +1,218 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
+import org.apache.spark.sql.parquet.ParquetRelation2
+import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
+import org.apache.spark.sql.types.StructType
+
+/**
+ * :: Experimental ::
+ * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
+ * key-value stores, etc).
+ *
+ * @since 1.4.0
+ */
+@Experimental
+class DataFrameReader private[sql](sqlContext: SQLContext) {
+
+  /**
+   * Specifies the input data source format.
+   *
+   * @since 1.4.0
+   */
+  def format(source: String): DataFrameReader = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Specifies the input schema. Some data sources (e.g. JSON) can infer the input schema
+   * automatically from data. By specifying the schema here, the underlying data source can
+   * skip the schema inference step, and thus speed up data loading.
+   *
+   * @since 1.4.0
+   */
+  def schema(schema: StructType): DataFrameReader = {
+    this.userSpecifiedSchema = Option(schema)
+    this
+  }
+
+  /**
+   * Adds an input option for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def option(key: String, value: String): DataFrameReader = {
+    this.extraOptions += (key -> value)
+    this
+  }
+
+  /**
+   * (Scala-specific) Adds input options for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataFrameReader = {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds input options for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def options(options: java.util.Map[String, String]): DataFrameReader = {
+    this.options(scala.collection.JavaConversions.mapAsScalaMap(options))
+    this
+  }
+
+  /**
+   * Specifies the input partitioning. If specified, the underlying data source does not need to
+   * discover the data partitioning scheme, and thus can speed up very large inputs.
+   *
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataFrameReader = {
+    this.partitioningColumns = Option(colNames)
+    this
+  }
+
+  /**
+   * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
+   * a local or distributed file system).
+   *
+   * @since 1.4.0
+   */
+  def load(path: String): DataFrame = {
+    option("path", path).load()
+  }
+
+  /**
+   * Loads input in as a [[DataFrame]], for data sources that don't require a path (e.g. external
+   * key-value stores).
+   *
+   * @since 1.4.0
+   */
+  def load(): DataFrame = {
+    val resolved = ResolvedDataSource(
+      sqlContext,
+      userSpecifiedSchema = userSpecifiedSchema,
+      partitionColumns = partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+      provider = source,
+      options = extraOptions.toMap)
+    DataFrame(sqlContext, LogicalRelation(resolved.relation))
+  }
+
+  /**
+   * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
+   *
+   * This function goes through the input once to determine the input schema. If you know the
+   * schema in advance, use the version that specifies the schema to avoid the extra scan.
+   *
+   * @param path input path
+   * @since 1.4.0
+   */
+  def json(path: String): DataFrame = format("json").load(path)
+
+  /**
+   * Loads an `JavaRDD[String]` storing JSON objects (one object per record) and
+   * returns the result as a [[DataFrame]].
+   *
+   * Unless the schema is specified using [[schema]] function, this function goes through the
+   * input once to determine the input schema.
+   *
+   * @param jsonRDD input RDD with one JSON object per record
+   * @since 1.4.0
+   */
+  def json(jsonRDD: JavaRDD[String]): DataFrame = json(jsonRDD.rdd)
+
+  /**
+   * Loads an `RDD[String]` storing JSON objects (one object per record) and
+   * returns the result as a [[DataFrame]].
+   *
+   * Unless the schema is specified using [[schema]] function, this function goes through the
+   * input once to determine the input schema.
+   *
+   * @param jsonRDD input RDD with one JSON object per record
+   * @since 1.4.0
+   */
+  def json(jsonRDD: RDD[String]): DataFrame = {
+    val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
+    if (sqlContext.conf.useJacksonStreamingAPI) {
+      sqlContext.baseRelationToDataFrame(
+        new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
+    } else {
+      val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
+      val appliedSchema = userSpecifiedSchema.getOrElse(
+        JsonRDD.nullTypeToStringType(
+          JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
+      val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
+      sqlContext.createDataFrame(rowRDD, appliedSchema, needsConversion = false)
+    }
+  }
+
+  /**
+   * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
+   * [[DataFrame]] if no paths are passed in.
+   *
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def parquet(paths: String*): DataFrame = {
+    if (paths.isEmpty) {
+      sqlContext.emptyDataFrame
+    } else {
+      val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
+      sqlContext.baseRelationToDataFrame(
+        new ParquetRelation2(
+          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(sqlContext))
+    }
+  }
+
+  /**
+   * Returns the specified table as a [[DataFrame]].
+   *
+   * @since 1.4.0
+   */
+  def table(tableName: String): DataFrame = {
+    DataFrame(sqlContext, sqlContext.catalog.lookupRelation(Seq(tableName)))
+  }
+
+  ///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  ///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = sqlContext.conf.defaultDataSourceName
+
+  private var userSpecifiedSchema: Option[StructType] = None
+
+  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+  private var partitioningColumns: Option[Seq[String]] = None
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
new file mode 100644
index 0000000..b1fc18a
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -0,0 +1,198 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.spark.sql
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
+
+
+/**
+ * :: Experimental ::
+ * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
+ * key-value stores, etc).
+ *
+ * @since 1.4.0
+ */
+@Experimental
+final class DataFrameWriter private[sql](df: DataFrame) {
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `SaveMode.Overwrite`: overwrite the existing data.
+   *   - `SaveMode.Append`: append the data.
+   *   - `SaveMode.Ignore`: ignore the operation (i.e. no-op).
+   *   - `SaveMode.ErrorIfExists`: default option, throw an exception at runtime.
+   *
+   * @since 1.4.0
+   */
+  def mode(saveMode: SaveMode): DataFrameWriter = {
+    this.mode = saveMode
+    this
+  }
+
+  /**
+   * Specifies the behavior when data or table already exists. Options include:
+   *   - `overwrite`: overwrite the existing data.
+   *   - `append`: append the data.
+   *   - `ignore`: ignore the operation (i.e. no-op).
+   *   - `error`: default option, throw an exception at runtime.
+   *
+   * @since 1.4.0
+   */
+  def mode(saveMode: String): DataFrameWriter = {
+    saveMode.toLowerCase match {
+      case "overwrite" => SaveMode.Overwrite
+      case "append" => SaveMode.Append
+      case "ignore" => SaveMode.Ignore
+      case "error" | "default" => SaveMode.ErrorIfExists
+      case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
+        "Accepted modes are 'overwrite', 'append', 'ignore', 'error'.")
+    }
+    this
+  }
+
+  /**
+   * Specifies the underlying output data source. Built-in options include "parquet", "json", etc.
+   *
+   * @since 1.4.0
+   */
+  def format(source: String): DataFrameWriter = {
+    this.source = source
+    this
+  }
+
+  /**
+   * Adds an output option for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def option(key: String, value: String): DataFrameWriter = {
+    this.extraOptions += (key -> value)
+    this
+  }
+
+  /**
+   * (Scala-specific) Adds output options for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def options(options: scala.collection.Map[String, String]): DataFrameWriter = {
+    this.extraOptions ++= options
+    this
+  }
+
+  /**
+   * Adds output options for the underlying data source.
+   *
+   * @since 1.4.0
+   */
+  def options(options: java.util.Map[String, String]): DataFrameWriter = {
+    this.options(scala.collection.JavaConversions.mapAsScalaMap(options))
+    this
+  }
+
+  /**
+   * Partitions the output by the given columns on the file system. If specified, the output is
+   * laid out on the file system similar to Hive's partitioning scheme.
+   *
+   * @since 1.4.0
+   */
+  @scala.annotation.varargs
+  def partitionBy(colNames: String*): DataFrameWriter = {
+    this.partitioningColumns = Option(colNames)
+    this
+  }
+
+  /**
+   * Saves the content of the [[DataFrame]] at the specified path.
+   *
+   * @since 1.4.0
+   */
+  def save(path: String): Unit = {
+    this.extraOptions += ("path" -> path)
+    save()
+  }
+
+  /**
+   * Saves the content of the [[DataFrame]] as the specified table.
+   *
+   * @since 1.4.0
+   */
+  def save(): Unit = {
+    ResolvedDataSource(
+      df.sqlContext,
+      source,
+      partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+      mode,
+      extraOptions.toMap,
+      df)
+  }
+
+  /**
+   * Saves the content of the [[DataFrame]] as the specified table.
+   *
+   * @since 1.4.0
+   */
+  def saveAsTable(tableName: String): Unit = {
+    val cmd =
+      CreateTableUsingAsSelect(
+        tableName,
+        source,
+        temporary = false,
+        partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+        mode,
+        extraOptions.toMap,
+        df.logicalPlan)
+    df.sqlContext.executePlan(cmd).toRdd
+  }
+
+  /**
+   * Saves the content of the [[DataFrame]] in JSON format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("json").save(path)
+   * }}}
+   *
+   * @since 1.4.0
+   */
+  def json(path: String): Unit = format("json").save(path)
+
+  /**
+   * Saves the content of the [[DataFrame]] in Parquet format at the specified path.
+   * This is equivalent to:
+   * {{{
+   *   format("parquet").save(path)
+   * }}}
+   *
+   * @since 1.4.0
+   */
+  def parquet(path: String): Unit = format("parquet").save(path)
+
+  ///////////////////////////////////////////////////////////////////////////////////////
+  // Builder pattern config options
+  ///////////////////////////////////////////////////////////////////////////////////////
+
+  private var source: String = df.sqlContext.conf.defaultDataSourceName
+
+  private var mode: SaveMode = SaveMode.ErrorIfExists
+
+  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+
+  private var partitioningColumns: Option[Seq[String]] = None
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 9fb355e..34a50e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -27,11 +27,9 @@ import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
 import com.google.common.reflect.TypeToken
-import org.apache.hadoop.fs.Path
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
-import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
@@ -43,8 +41,6 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.execution.{Filter, _}
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.json._
-import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -597,21 +593,33 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
+   * :: Experimental ::
+   * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
+   * {{{
+   *   sqlContext.read.parquet("/path/to/file.parquet")
+   *   sqlContext.read.schema(schema).json("/path/to/file.json")
+   * }}}
+   *
+   * @group genericdata
+   * @since 1.4.0
+   */
+  @Experimental
+  def read: DataFrameReader = new DataFrameReader(this)
+
+  /**
    * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
    * [[DataFrame]] if no paths are passed in.
    *
    * @group specificdata
    * @since 1.3.0
    */
+  @deprecated("Use read.parquet()", "1.4.0")
   @scala.annotation.varargs
   def parquetFile(paths: String*): DataFrame = {
     if (paths.isEmpty) {
       emptyDataFrame
     } else if (conf.parquetUseDataSourceApi) {
-      val globbedPaths = paths.map(new Path(_)).flatMap(SparkHadoopUtil.get.globPath).toArray
-      baseRelationToDataFrame(
-        new ParquetRelation2(
-          globbedPaths.map(_.toString), None, None, Map.empty[String, String])(this))
+      read.parquet(paths : _*)
     } else {
       DataFrame(this, parquet.ParquetRelation(
         paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
@@ -625,28 +633,31 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group specificdata
    * @since 1.3.0
    */
-  def jsonFile(path: String): DataFrame = jsonFile(path, 1.0)
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String): DataFrame = {
+    read.json(path)
+  }
 
   /**
-   * :: Experimental ::
    * Loads a JSON file (one object per line) and applies the given schema,
    * returning the result as a [[DataFrame]].
    *
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
-  def jsonFile(path: String, schema: StructType): DataFrame =
-    load("json", schema, Map("path" -> path))
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String, schema: StructType): DataFrame = {
+    read.schema(schema).json(path)
+  }
 
   /**
-   * :: Experimental ::
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
-  def jsonFile(path: String, samplingRatio: Double): DataFrame =
-    load("json", Map("path" -> path, "samplingRatio" -> samplingRatio.toString))
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(path)
+  }
 
   /**
    * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
@@ -656,8 +667,8 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group specificdata
    * @since 1.3.0
    */
-  def jsonRDD(json: RDD[String]): DataFrame = jsonRDD(json, 1.0)
-
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
 
   /**
    * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
@@ -667,196 +678,131 @@ class SQLContext(@transient val sparkContext: SparkContext)
    * @group specificdata
    * @since 1.3.0
    */
-  def jsonRDD(json: JavaRDD[String]): DataFrame = jsonRDD(json.rdd, 1.0)
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
 
   /**
-   * :: Experimental ::
    * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
    * returning the result as a [[DataFrame]].
    *
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.json()", "1.4.0")
   def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
-    if (conf.useJacksonStreamingAPI) {
-      baseRelationToDataFrame(new JSONRelation(() => json, None, 1.0, Some(schema))(this))
-    } else {
-      val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
-      val appliedSchema =
-        Option(schema).getOrElse(
-          JsonRDD.nullTypeToStringType(
-            JsonRDD.inferSchema(json, 1.0, columnNameOfCorruptJsonRecord)))
-      val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
-      createDataFrame(rowRDD, appliedSchema, needsConversion = false)
-    }
+    read.schema(schema).json(json)
   }
 
   /**
-   * :: Experimental ::
    * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
    * schema, returning the result as a [[DataFrame]].
    *
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.json()", "1.4.0")
   def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
-    jsonRDD(json.rdd, schema)
+    read.schema(schema).json(json)
   }
 
   /**
-   * :: Experimental ::
    * Loads an RDD[String] storing JSON objects (one object per record) inferring the
    * schema, returning the result as a [[DataFrame]].
    *
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.json()", "1.4.0")
   def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
-    if (conf.useJacksonStreamingAPI) {
-      baseRelationToDataFrame(new JSONRelation(() => json, None, samplingRatio, None)(this))
-    } else {
-      val columnNameOfCorruptJsonRecord = conf.columnNameOfCorruptRecord
-      val appliedSchema =
-        JsonRDD.nullTypeToStringType(
-          JsonRDD.inferSchema(json, samplingRatio, columnNameOfCorruptJsonRecord))
-      val rowRDD = JsonRDD.jsonStringToRow(json, appliedSchema, columnNameOfCorruptJsonRecord)
-      createDataFrame(rowRDD, appliedSchema, needsConversion = false)
-    }
+    read.option("samplingRatio", samplingRatio.toString).json(json)
   }
 
   /**
-   * :: Experimental ::
    * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
    * schema, returning the result as a [[DataFrame]].
    *
    * @group specificdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.json()", "1.4.0")
   def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
-    jsonRDD(json.rdd, samplingRatio);
+    read.option("samplingRatio", samplingRatio.toString).json(json)
   }
 
   /**
-   * :: Experimental ::
    * Returns the dataset stored at path as a DataFrame,
    * using the default data source configured by spark.sql.sources.default.
    *
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.load(path)", "1.4.0")
   def load(path: String): DataFrame = {
-    val dataSourceName = conf.defaultDataSourceName
-    load(path, dataSourceName)
+    read.load(path)
   }
 
   /**
-   * :: Experimental ::
    * Returns the dataset stored at path as a DataFrame, using the given data source.
    *
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.format(source).load(path)", "1.4.0")
   def load(path: String, source: String): DataFrame = {
-    load(source, Map("path" -> path))
+    read.format(source).load(path)
   }
 
   /**
-   * :: Experimental ::
    * (Java-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame.
    *
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
   def load(source: String, options: java.util.Map[String, String]): DataFrame = {
-    load(source, options.toMap)
+    read.options(options).format(source).load()
   }
 
   /**
-   * :: Experimental ::
    * (Scala-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame.
    *
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
   def load(source: String, options: Map[String, String]): DataFrame = {
-    val resolved = ResolvedDataSource(this, None, Array.empty[String], source, options)
-    DataFrame(this, LogicalRelation(resolved.relation))
-  }
-
-  /**
-   * :: Experimental ::
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @Experimental
-  def load(
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    load(source, schema, options.toMap)
+    read.options(options).format(source).load()
   }
 
   /**
-   * :: Experimental ::
    * (Java-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
    *
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
   def load(
       source: String,
       schema: StructType,
-      partitionColumns: Array[String],
       options: java.util.Map[String, String]): DataFrame = {
-    load(source, schema, partitionColumns, options.toMap)
-  }
-
-  /**
-   * :: Experimental ::
-   * (Scala-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @Experimental
-  def load(
-      source: String,
-      schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    val resolved = ResolvedDataSource(this, Some(schema), Array.empty[String], source, options)
-    DataFrame(this, LogicalRelation(resolved.relation))
+    read.format(source).schema(schema).options(options).load()
   }
 
   /**
-   * :: Experimental ::
    * (Scala-specific) Returns the dataset specified by the given data source and
    * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
    * @group genericdata
    * @since 1.3.0
    */
-  @Experimental
+  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
   def load(
       source: String,
       schema: StructType,
-      partitionColumns: Array[String],
       options: Map[String, String]): DataFrame = {
-    val resolved = ResolvedDataSource(this, Some(schema), partitionColumns, source, options)
-    DataFrame(this, LogicalRelation(resolved.relation))
+    read.format(source).schema(schema).options(options).load()
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
index 9d17516..7a73b6f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTest.scala
@@ -90,7 +90,7 @@ private[sql] trait ParquetTest {
       (data: Seq[T])
       (f: String => Unit): Unit = {
     withTempPath { file =>
-      sparkContext.parallelize(data).toDF().saveAsParquetFile(file.getCanonicalPath)
+      sparkContext.parallelize(data).toDF().write.parquet(file.getCanonicalPath)
       f(file.getCanonicalPath)
     }
   }
@@ -102,7 +102,7 @@ private[sql] trait ParquetTest {
   protected def withParquetDataFrame[T <: Product: ClassTag: TypeTag]
       (data: Seq[T])
       (f: DataFrame => Unit): Unit = {
-    withParquetFile(data)(path => f(sqlContext.parquetFile(path)))
+    withParquetFile(data)(path => f(sqlContext.read.parquet(path)))
   }
 
   /**
@@ -128,12 +128,12 @@ private[sql] trait ParquetTest {
 
   protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
       data: Seq[T], path: File): Unit = {
-    data.toDF().save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+    data.toDF().write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
   }
 
   protected def makeParquetFile[T <: Product: ClassTag: TypeTag](
       df: DataFrame, path: File): Unit = {
-    df.save(path.getCanonicalPath, "org.apache.spark.sql.parquet", SaveMode.Overwrite)
+    df.write.mode(SaveMode.Overwrite).parquet(path.getCanonicalPath)
   }
 
   protected def makePartitionDir(

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
index b76f7d4..6a0bcef 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -75,9 +75,9 @@ public class JavaSaveLoadSuite {
   public void saveAndLoad() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+    df.save("json", SaveMode.ErrorIfExists, options);
 
-    DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", options);
+    DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
 
     checkAnswer(loadedDF, df.collectAsList());
   }
@@ -86,12 +86,12 @@ public class JavaSaveLoadSuite {
   public void saveAndLoadWithSchema() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+    df.save("json", SaveMode.ErrorIfExists, options);
 
     List<StructField> fields = new ArrayList<StructField>();
     fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
     StructType schema = DataTypes.createStructType(fields);
-    DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", schema, options);
+    DataFrame loadedDF = sqlContext.load("json", schema, options);
 
     checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index 1d5f6b3..054b23d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -460,14 +460,14 @@ class DataFrameSuite extends QueryTest {
   }
 
   test("SPARK-7551: support backticks for DataFrame attribute resolution") {
-    val df = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+    val df = TestSQLContext.read.json(TestSQLContext.sparkContext.makeRDD(
       """{"a.b": {"c": {"d..e": {"f": 1}}}}""" :: Nil))
     checkAnswer(
       df.select(df("`a.b`.c.`d..e`.`f`")),
       Row(1)
     )
 
-    val df2 = TestSQLContext.jsonRDD(TestSQLContext.sparkContext.makeRDD(
+    val df2 = TestSQLContext.read.json(TestSQLContext.sparkContext.makeRDD(
       """{"a  b": {"c": {"d  e": {"f": 1}}}}""" :: Nil))
     checkAnswer(
       df2.select(df2("`a  b`.c.d  e.f")),

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 479ad9f..c5c4f44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -105,7 +105,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("grouping on nested fields") {
-    jsonRDD(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
+    read.json(sparkContext.parallelize("""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))
      .registerTempTable("rows")
 
     checkAnswer(
@@ -122,7 +122,8 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SPARK-6201 IN type conversion") {
-    jsonRDD(sparkContext.parallelize(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
+    read.json(
+      sparkContext.parallelize(Seq("{\"a\": \"1\"}}", "{\"a\": \"2\"}}", "{\"a\": \"3\"}}")))
       .registerTempTable("d")
 
     checkAnswer(
@@ -1199,7 +1200,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   test("SPARK-3483 Special chars in column names") {
     val data = sparkContext.parallelize(
       Seq("""{"key?number1": "value1", "key.number2": "value2"}"""))
-    jsonRDD(data).registerTempTable("records")
+    read.json(data).registerTempTable("records")
     sql("SELECT `key?number1`, `key.number2` FROM records")
   }
 
@@ -1240,11 +1241,11 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SPARK-4322 Grouping field with struct field as sub expression") {
-    jsonRDD(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)).registerTempTable("data")
+    read.json(sparkContext.makeRDD("""{"a": {"b": [{"c": 1}]}}""" :: Nil)).registerTempTable("data")
     checkAnswer(sql("SELECT a.b[0].c FROM data GROUP BY a.b[0].c"), Row(1))
     dropTempTable("data")
 
-    jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
+    read.json(sparkContext.makeRDD("""{"a": {"b": 1}}""" :: Nil)).registerTempTable("data")
     checkAnswer(sql("SELECT a.b + 1 FROM data GROUP BY a.b + 1"), Row(2))
     dropTempTable("data")
   }
@@ -1292,7 +1293,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SPARK-6145: ORDER BY test for nested fields") {
-    jsonRDD(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
+    read.json(sparkContext.makeRDD("""{"a": {"b": 1, "a": {"a": 1}}, "c": [{"d": 1}]}""" :: Nil))
       .registerTempTable("nestedOrder")
 
     checkAnswer(sql("SELECT 1 FROM nestedOrder ORDER BY a.b"), Row(1))
@@ -1304,14 +1305,14 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SPARK-6145: special cases") {
-    jsonRDD(sparkContext.makeRDD(
+    read.json(sparkContext.makeRDD(
       """{"a": {"b": [1]}, "b": [{"a": 1}], "c0": {"a": 1}}""" :: Nil)).registerTempTable("t")
     checkAnswer(sql("SELECT a.b[0] FROM t ORDER BY c0.a"), Row(1))
     checkAnswer(sql("SELECT b[0].a FROM t ORDER BY c0.a"), Row(1))
   }
 
   test("SPARK-6898: complete support for special chars in column names") {
-    jsonRDD(sparkContext.makeRDD(
+    read.json(sparkContext.makeRDD(
       """{"a": {"c.b": 1}, "b.$q": [{"a@!.q": 1}], "q.w": {"w.i&": [1]}}""" :: Nil))
       .registerTempTable("t")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
index 2672e20..dc2d43a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UserDefinedTypeSuite.scala
@@ -105,13 +105,13 @@ class UserDefinedTypeSuite extends QueryTest {
   test("UDTs with Parquet") {
     val tempDir = Utils.createTempDir()
     tempDir.delete()
-    pointsRDD.saveAsParquetFile(tempDir.getCanonicalPath)
+    pointsRDD.write.parquet(tempDir.getCanonicalPath)
   }
 
   test("Repartition UDTs with Parquet") {
     val tempDir = Utils.createTempDir()
     tempDir.delete()
-    pointsRDD.repartition(1).saveAsParquetFile(tempDir.getCanonicalPath)
+    pointsRDD.repartition(1).write.parquet(tempDir.getCanonicalPath)
   }
 
   // Tests to make sure that all operators correctly convert types on the way out.

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
index b06e338..6f747e5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala
@@ -215,7 +215,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Complex field and type inferring with null in sampling") {
-    val jsonDF = jsonRDD(jsonNullStruct)
+    val jsonDF = read.json(jsonNullStruct)
     val expectedSchema = StructType(
       StructField("headers", StructType(
         StructField("Charset", StringType, true) ::
@@ -234,7 +234,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Primitive field and type inferring") {
-    val jsonDF = jsonRDD(primitiveFieldAndType)
+    val jsonDF = read.json(primitiveFieldAndType)
 
     val expectedSchema = StructType(
       StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -262,7 +262,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Complex field and type inferring") {
-    val jsonDF = jsonRDD(complexFieldAndType1)
+    val jsonDF = read.json(complexFieldAndType1)
 
     val expectedSchema = StructType(
       StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) ::
@@ -361,7 +361,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("GetField operation on complex data type") {
-    val jsonDF = jsonRDD(complexFieldAndType1)
+    val jsonDF = read.json(complexFieldAndType1)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -377,7 +377,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Type conflict in primitive field values") {
-    val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
+    val jsonDF = read.json(primitiveFieldValueTypeConflict)
 
     val expectedSchema = StructType(
       StructField("num_bool", StringType, true) ::
@@ -451,7 +451,7 @@ class JsonSuite extends QueryTest {
   }
 
   ignore("Type conflict in primitive field values (Ignored)") {
-    val jsonDF = jsonRDD(primitiveFieldValueTypeConflict)
+    val jsonDF = read.json(primitiveFieldValueTypeConflict)
     jsonDF.registerTempTable("jsonTable")
 
     // Right now, the analyzer does not promote strings in a boolean expression.
@@ -504,7 +504,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Type conflict in complex field values") {
-    val jsonDF = jsonRDD(complexFieldValueTypeConflict)
+    val jsonDF = read.json(complexFieldValueTypeConflict)
 
     val expectedSchema = StructType(
       StructField("array", ArrayType(LongType, true), true) ::
@@ -528,7 +528,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Type conflict in array elements") {
-    val jsonDF = jsonRDD(arrayElementTypeConflict)
+    val jsonDF = read.json(arrayElementTypeConflict)
 
     val expectedSchema = StructType(
       StructField("array1", ArrayType(StringType, true), true) ::
@@ -556,7 +556,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("Handling missing fields") {
-    val jsonDF = jsonRDD(missingFields)
+    val jsonDF = read.json(missingFields)
 
     val expectedSchema = StructType(
       StructField("a", BooleanType, true) ::
@@ -576,7 +576,7 @@ class JsonSuite extends QueryTest {
     dir.delete()
     val path = dir.getCanonicalPath
     sparkContext.parallelize(1 to 100).map(i => s"""{"a": 1, "b": "str$i"}""").saveAsTextFile(path)
-    val jsonDF = jsonFile(path, 0.49)
+    val jsonDF = read.option("samplingRatio", "0.49").json(path)
 
     val analyzed = jsonDF.queryExecution.analyzed
     assert(
@@ -591,7 +591,7 @@ class JsonSuite extends QueryTest {
 
     val schema = StructType(StructField("a", LongType, true) :: Nil)
     val logicalRelation =
-      jsonFile(path, schema).queryExecution.analyzed.asInstanceOf[LogicalRelation]
+      read.schema(schema).json(path).queryExecution.analyzed.asInstanceOf[LogicalRelation]
     val relationWithSchema = logicalRelation.relation.asInstanceOf[JSONRelation]
     assert(relationWithSchema.path === Some(path))
     assert(relationWithSchema.schema === schema)
@@ -603,7 +603,7 @@ class JsonSuite extends QueryTest {
     dir.delete()
     val path = dir.getCanonicalPath
     primitiveFieldAndType.map(record => record.replaceAll("\n", " ")).saveAsTextFile(path)
-    val jsonDF = jsonFile(path)
+    val jsonDF = read.json(path)
 
     val expectedSchema = StructType(
       StructField("bigInteger", DecimalType.Unlimited, true) ::
@@ -672,7 +672,7 @@ class JsonSuite extends QueryTest {
       StructField("null", StringType, true) ::
       StructField("string", StringType, true) :: Nil)
 
-    val jsonDF1 = jsonFile(path, schema)
+    val jsonDF1 = read.schema(schema).json(path)
 
     assert(schema === jsonDF1.schema)
 
@@ -689,7 +689,7 @@ class JsonSuite extends QueryTest {
       "this is a simple string.")
     )
 
-    val jsonDF2 = jsonRDD(primitiveFieldAndType, schema)
+    val jsonDF2 = read.schema(schema).json(primitiveFieldAndType)
 
     assert(schema === jsonDF2.schema)
 
@@ -710,7 +710,7 @@ class JsonSuite extends QueryTest {
   test("Applying schemas with MapType") {
     val schemaWithSimpleMap = StructType(
       StructField("map", MapType(StringType, IntegerType, true), false) :: Nil)
-    val jsonWithSimpleMap = jsonRDD(mapType1, schemaWithSimpleMap)
+    val jsonWithSimpleMap = read.schema(schemaWithSimpleMap).json(mapType1)
 
     jsonWithSimpleMap.registerTempTable("jsonWithSimpleMap")
 
@@ -738,7 +738,7 @@ class JsonSuite extends QueryTest {
     val schemaWithComplexMap = StructType(
       StructField("map", MapType(StringType, innerStruct, true), false) :: Nil)
 
-    val jsonWithComplexMap = jsonRDD(mapType2, schemaWithComplexMap)
+    val jsonWithComplexMap = read.schema(schemaWithComplexMap).json(mapType2)
 
     jsonWithComplexMap.registerTempTable("jsonWithComplexMap")
 
@@ -764,7 +764,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("SPARK-2096 Correctly parse dot notations") {
-    val jsonDF = jsonRDD(complexFieldAndType2)
+    val jsonDF = read.json(complexFieldAndType2)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -782,7 +782,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("SPARK-3390 Complex arrays") {
-    val jsonDF = jsonRDD(complexFieldAndType2)
+    val jsonDF = read.json(complexFieldAndType2)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -805,7 +805,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("SPARK-3308 Read top level JSON arrays") {
-    val jsonDF = jsonRDD(jsonArray)
+    val jsonDF = read.json(jsonArray)
     jsonDF.registerTempTable("jsonTable")
 
     checkAnswer(
@@ -826,7 +826,7 @@ class JsonSuite extends QueryTest {
     val oldColumnNameOfCorruptRecord = TestSQLContext.conf.columnNameOfCorruptRecord
     TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, "_unparsed")
 
-    val jsonDF = jsonRDD(corruptRecords)
+    val jsonDF = read.json(corruptRecords)
     jsonDF.registerTempTable("jsonTable")
 
     val schema = StructType(
@@ -880,7 +880,7 @@ class JsonSuite extends QueryTest {
   }
 
   test("SPARK-4068: nulls in arrays") {
-    val jsonDF = jsonRDD(nullsInArrays)
+    val jsonDF = read.json(nullsInArrays)
     jsonDF.registerTempTable("jsonTable")
 
     val schema = StructType(
@@ -957,8 +957,8 @@ class JsonSuite extends QueryTest {
     assert(result2(1) === "{\"f1\":{\"f11\":2,\"f12\":false},\"f2\":{\"B2\":null}}")
     assert(result2(3) === "{\"f1\":{\"f11\":4,\"f12\":true},\"f2\":{\"D4\":2147483644}}")
 
-    val jsonDF = jsonRDD(primitiveFieldAndType)
-    val primTable = jsonRDD(jsonDF.toJSON)
+    val jsonDF = read.json(primitiveFieldAndType)
+    val primTable = read.json(jsonDF.toJSON)
     primTable.registerTempTable("primativeTable")
     checkAnswer(
         sql("select * from primativeTable"),
@@ -970,8 +970,8 @@ class JsonSuite extends QueryTest {
         "this is a simple string.")
       )
 
-    val complexJsonDF = jsonRDD(complexFieldAndType1)
-    val compTable = jsonRDD(complexJsonDF.toJSON)
+    val complexJsonDF = read.json(complexFieldAndType1)
+    val compTable = read.json(complexJsonDF.toJSON)
     compTable.registerTempTable("complexTable")
     // Access elements of a primitive array.
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
index 5ad4395..bdc2eba 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetFilterSuite.scala
@@ -328,12 +328,12 @@ class ParquetDataSourceOnFilterSuite extends ParquetFilterSuiteBase with BeforeA
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
 
         // If the "part = 1" filter gets pushed down, this query will throw an exception since
         // "part" is not a valid column in the actual Parquet file
         checkAnswer(
-          sqlContext.parquetFile(path).filter("part = 1"),
+          sqlContext.read.parquet(path).filter("part = 1"),
           (1 to 3).map(i => Row(i, i.toString, 1)))
       }
     }
@@ -357,7 +357,7 @@ class ParquetDataSourceOffFilterSuite extends ParquetFilterSuiteBase with Before
     withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED -> "true") {
       withTempPath { dir =>
         val path = s"${dir.getCanonicalPath}/part=1"
-        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").saveAsParquetFile(path)
+        (1 to 3).map(i => (i, i.toString)).toDF("a", "b").write.parquet(path)
 
         // If the "part = 1" filter gets pushed down, this query will throw an exception since
         // "part" is not a valid column in the actual Parquet file

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index 008443d..dd48bb3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -114,24 +114,24 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     for ((precision, scale) <- Seq((5, 2), (1, 0), (1, 1), (18, 10), (18, 17))) {
       withTempPath { dir =>
         val data = makeDecimalRDD(DecimalType(precision, scale))
-        data.saveAsParquetFile(dir.getCanonicalPath)
-        checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+        data.write.parquet(dir.getCanonicalPath)
+        checkAnswer(read.parquet(dir.getCanonicalPath), data.collect().toSeq)
       }
     }
 
     // Decimals with precision above 18 are not yet supported
     intercept[Throwable] {
       withTempPath { dir =>
-        makeDecimalRDD(DecimalType(19, 10)).saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).collect()
+        makeDecimalRDD(DecimalType(19, 10)).write.parquet(dir.getCanonicalPath)
+        read.parquet(dir.getCanonicalPath).collect()
       }
     }
 
     // Unlimited-length decimals are not yet supported
     intercept[Throwable] {
       withTempPath { dir =>
-        makeDecimalRDD(DecimalType.Unlimited).saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).collect()
+        makeDecimalRDD(DecimalType.Unlimited).write.parquet(dir.getCanonicalPath)
+        read.parquet(dir.getCanonicalPath).collect()
       }
     }
   }
@@ -146,8 +146,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
 
     withTempPath { dir =>
       val data = makeDateRDD()
-      data.saveAsParquetFile(dir.getCanonicalPath)
-      checkAnswer(parquetFile(dir.getCanonicalPath), data.collect().toSeq)
+      data.write.parquet(dir.getCanonicalPath)
+      checkAnswer(read.parquet(dir.getCanonicalPath), data.collect().toSeq)
     }
   }
 
@@ -283,7 +283,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     withTempDir { dir =>
       val path = new Path(dir.toURI.toString, "part-r-0.parquet")
       makeRawParquetFile(path)
-      checkAnswer(parquetFile(path.toString), (0 until 10).map { i =>
+      checkAnswer(read.parquet(path.toString), (0 until 10).map { i =>
         Row(i % 2 == 0, i, i.toLong, i.toFloat, i.toDouble)
       })
     }
@@ -311,8 +311,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
   test("save - overwrite") {
     withParquetFile((1 to 10).map(i => (i, i.toString))) { file =>
       val newData = (11 to 20).map(i => (i, i.toString))
-      newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Overwrite, Map("path" -> file))
-      checkAnswer(parquetFile(file), newData.map(Row.fromTuple))
+      newData.toDF().write.format("parquet").mode(SaveMode.Overwrite).save(file)
+      checkAnswer(read.parquet(file), newData.map(Row.fromTuple))
     }
   }
 
@@ -320,8 +320,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     val data = (1 to 10).map(i => (i, i.toString))
     withParquetFile(data) { file =>
       val newData = (11 to 20).map(i => (i, i.toString))
-      newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Ignore, Map("path" -> file))
-      checkAnswer(parquetFile(file), data.map(Row.fromTuple))
+      newData.toDF().write.format("parquet").mode(SaveMode.Ignore).save(file)
+      checkAnswer(read.parquet(file), data.map(Row.fromTuple))
     }
   }
 
@@ -330,8 +330,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     withParquetFile(data) { file =>
       val newData = (11 to 20).map(i => (i, i.toString))
       val errorMessage = intercept[Throwable] {
-        newData.toDF().save(
-          "org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> file))
+        newData.toDF().write.format("parquet").mode(SaveMode.ErrorIfExists).save(file)
       }.getMessage
       assert(errorMessage.contains("already exists"))
     }
@@ -341,8 +340,8 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
     val data = (1 to 10).map(i => (i, i.toString))
     withParquetFile(data) { file =>
       val newData = (11 to 20).map(i => (i, i.toString))
-      newData.toDF().save("org.apache.spark.sql.parquet", SaveMode.Append, Map("path" -> file))
-      checkAnswer(parquetFile(file), (data ++ newData).map(Row.fromTuple))
+      newData.toDF().write.format("parquet").mode(SaveMode.Append).save(file)
+      checkAnswer(read.parquet(file), (data ++ newData).map(Row.fromTuple))
     }
   }
 
@@ -374,7 +373,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
         path,
         new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil)
 
-      assertResult(parquetFile(path.toString).schema) {
+      assertResult(read.parquet(path.toString).schema) {
         StructType(
           StructField("a", BooleanType, nullable = false) ::
           StructField("b", IntegerType, nullable = false) ::
@@ -392,7 +391,7 @@ class ParquetIOSuiteBase extends QueryTest with ParquetTest {
       sqlContext.udf.register("div0", (x: Int) => x / 0)
       withTempPath { dir =>
         intercept[org.apache.spark.SparkException] {
-          sqlContext.sql("select div0(1)").saveAsParquetFile(dir.getCanonicalPath)
+          sqlContext.sql("select div0(1)").write.parquet(dir.getCanonicalPath)
         }
         val path = new Path(dir.getCanonicalPath, "_temporary")
         val fs = path.getFileSystem(configuration)
@@ -421,10 +420,10 @@ class ParquetDataSourceOnIOSuite extends ParquetIOSuiteBase with BeforeAndAfterA
     // In 1.3.0, save to fs other than file: without configuring core-site.xml would get:
     // IllegalArgumentException: Wrong FS: hdfs://..., expected: file:///
     intercept[Throwable] {
-      sqlContext.parquetFile("file:///nonexistent")
+      sqlContext.read.parquet("file:///nonexistent")
     }
     val errorMessage = intercept[Throwable] {
-      sqlContext.parquetFile("hdfs://nonexistent")
+      sqlContext.read.parquet("hdfs://nonexistent")
     }.toString
     assert(errorMessage.contains("UnknownHostException"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
index 138e197..8079c46 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetPartitionDiscoverySuite.scala
@@ -155,7 +155,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      parquetFile(base.getCanonicalPath).registerTempTable("t")
+      read.parquet(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(
@@ -202,7 +202,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      parquetFile(base.getCanonicalPath).registerTempTable("t")
+      read.parquet(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(
@@ -250,10 +250,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      val parquetRelation = load(
-        "org.apache.spark.sql.parquet",
-        Map("path" -> base.getCanonicalPath))
-
+      val parquetRelation = read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath)
       parquetRelation.registerTempTable("t")
 
       withTempTable("t") {
@@ -293,10 +290,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
           makePartitionDir(base, defaultPartitionName, "pi" -> pi, "ps" -> ps))
       }
 
-      val parquetRelation = load(
-        "org.apache.spark.sql.parquet",
-        Map("path" -> base.getCanonicalPath))
-
+      val parquetRelation = read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath)
       parquetRelation.registerTempTable("t")
 
       withTempTable("t") {
@@ -328,7 +322,7 @@ class ParquetPartitionDiscoverySuite extends QueryTest with ParquetTest {
         (1 to 10).map(i => (i, i.toString)).toDF("intField", "stringField"),
         makePartitionDir(base, defaultPartitionName, "pi" -> 2))
 
-      load(base.getCanonicalPath, "org.apache.spark.sql.parquet").registerTempTable("t")
+      read.format("org.apache.spark.sql.parquet").load(base.getCanonicalPath).registerTempTable("t")
 
       withTempTable("t") {
         checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index 4e54b2e..d2d1011 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -33,7 +33,7 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
   override def beforeAll(): Unit = {
     path = Utils.createTempDir()
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    jsonRDD(rdd).registerTempTable("jt")
+    read.json(rdd).registerTempTable("jt")
   }
 
   override def afterAll(): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index d1d427e..6f375ef 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -33,7 +33,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
   override def beforeAll: Unit = {
     path = Utils.createTempDir()
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    jsonRDD(rdd).registerTempTable("jt")
+    read.json(rdd).registerTempTable("jt")
     sql(
       s"""
         |CREATE TEMPORARY TABLE jsonTable (a int, b string)
@@ -109,7 +109,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
 
     // Writing the table to less part files.
     val rdd1 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 5)
-    jsonRDD(rdd1).registerTempTable("jt1")
+    read.json(rdd1).registerTempTable("jt1")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt1
@@ -121,7 +121,7 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
 
     // Writing the table to more part files.
     val rdd2 = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""), 10)
-    jsonRDD(rdd2).registerTempTable("jt2")
+    read.json(rdd2).registerTempTable("jt2")
     sql(
       s"""
          |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt2
@@ -154,13 +154,13 @@ class InsertSuite extends DataSourceTest with BeforeAndAfterAll {
   }
 
   test("save directly to the path of a JSON table") {
-    table("jt").selectExpr("a * 5 as a", "b").save(path.toString, "json", SaveMode.Overwrite)
+    table("jt").selectExpr("a * 5 as a", "b").write.mode(SaveMode.Overwrite).json(path.toString)
     checkAnswer(
       sql("SELECT a, b FROM jsonTable"),
       (1 to 10).map(i => Row(i * 5, s"str$i"))
     )
 
-    table("jt").save(path.toString, "json", SaveMode.Overwrite)
+    table("jt").write.mode(SaveMode.Overwrite).json(path.toString)
     checkAnswer(
       sql("SELECT a, b FROM jsonTable"),
       (1 to 10).map(i => Row(i, s"str$i"))

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index 6567d1a..7a28e9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -42,7 +42,7 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
     path.delete()
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    df = jsonRDD(rdd)
+    df = read.json(rdd)
     df.registerTempTable("jsonTable")
   }
 
@@ -57,41 +57,41 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
 
   def checkLoad(): Unit = {
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    checkAnswer(load(path.toString), df.collect())
+    checkAnswer(read.load(path.toString), df.collect())
 
     // Test if we can pick up the data source name passed in load.
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    checkAnswer(load(path.toString, "org.apache.spark.sql.json"), df.collect())
-    checkAnswer(load("org.apache.spark.sql.json", Map("path" -> path.toString)), df.collect())
+    checkAnswer(read.format("json").load(path.toString), df.collect())
+    checkAnswer(read.format("json").load(path.toString), df.collect())
     val schema = StructType(StructField("b", StringType, true) :: Nil)
     checkAnswer(
-      load("org.apache.spark.sql.json", schema, Map("path" -> path.toString)),
+      read.format("json").schema(schema).load(path.toString),
       sql("SELECT b FROM jsonTable").collect())
   }
 
   test("save with path and load") {
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
-    df.save(path.toString)
+    df.write.save(path.toString)
     checkLoad()
   }
 
   test("save with path and datasource, and load") {
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.save(path.toString, "org.apache.spark.sql.json")
+    df.write.json(path.toString)
     checkLoad()
   }
 
   test("save with data source and options, and load") {
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, Map("path" -> path.toString))
+    df.write.mode(SaveMode.ErrorIfExists).json(path.toString)
     checkLoad()
   }
 
   test("save and save again") {
-    df.save(path.toString, "org.apache.spark.sql.json")
+    df.write.json(path.toString)
 
     var message = intercept[RuntimeException] {
-      df.save(path.toString, "org.apache.spark.sql.json")
+      df.write.json(path.toString)
     }.getMessage
 
     assert(
@@ -100,14 +100,14 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
 
     if (path.exists()) Utils.deleteRecursively(path)
 
-    df.save(path.toString, "org.apache.spark.sql.json")
+    df.write.json(path.toString)
     checkLoad()
 
-    df.save("org.apache.spark.sql.json", SaveMode.Overwrite, Map("path" -> path.toString))
+    df.write.mode(SaveMode.Overwrite).json(path.toString)
     checkLoad()
 
     message = intercept[RuntimeException] {
-      df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
+      df.write.mode(SaveMode.Append).json(path.toString)
     }.getMessage
 
     assert(

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index d46a127..c6b6510 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -140,7 +140,7 @@ private[hive] trait HiveStrategies {
               PhysicalRDD(plan.output, sparkContext.emptyRDD[Row]) :: Nil
             } else {
               hiveContext
-                .parquetFile(partitionLocations: _*)
+                .read.parquet(partitionLocations: _*)
                 .addPartitioningAttributes(relation.partitionKeys)
                 .lowerCase
                 .where(unresolvedOtherPredicates)
@@ -152,7 +152,7 @@ private[hive] trait HiveStrategies {
 
           } else {
             hiveContext
-              .parquetFile(relation.hiveQlTable.getDataLocation.toString)
+              .read.parquet(relation.hiveQlTable.getDataLocation.toString)
               .lowerCase
               .where(unresolvedOtherPredicates)
               .select(unresolvedProjection: _*)

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
index 7ff5719..5a5ea10 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveParquetSuite.scala
@@ -55,8 +55,8 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
 
     test(s"$prefix: Converting Hive to Parquet Table via saveAsParquetFile") {
       withTempPath { dir =>
-        sql("SELECT * FROM src").saveAsParquetFile(dir.getCanonicalPath)
-        parquetFile(dir.getCanonicalPath).registerTempTable("p")
+        sql("SELECT * FROM src").write.parquet(dir.getCanonicalPath)
+        read.parquet(dir.getCanonicalPath).registerTempTable("p")
         withTempTable("p") {
           checkAnswer(
             sql("SELECT * FROM src ORDER BY key"),
@@ -68,8 +68,8 @@ class HiveParquetSuite extends QueryTest with ParquetTest {
     test(s"$prefix: INSERT OVERWRITE TABLE Parquet table") {
       withParquetTable((1 to 10).map(i => (i, s"val_$i")), "t") {
         withTempPath { file =>
-          sql("SELECT * FROM t LIMIT 1").saveAsParquetFile(file.getCanonicalPath)
-          parquetFile(file.getCanonicalPath).registerTempTable("p")
+          sql("SELECT * FROM t LIMIT 1").write.parquet(file.getCanonicalPath)
+          read.parquet(file.getCanonicalPath).registerTempTable("p")
           withTempTable("p") {
             // let's do three overwrites for good measure
             sql("INSERT OVERWRITE TABLE p SELECT * FROM t")

http://git-wip-us.apache.org/repos/asf/spark/blob/578bfeef/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 1bf1c1b..58b0b80 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -60,7 +60,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
-      jsonFile(filePath).collect().toSeq)
+      read.json(filePath).collect().toSeq)
   }
 
   test ("persistent JSON table with a user specified schema") {
@@ -77,7 +77,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
         |)
       """.stripMargin)
 
-    jsonFile(filePath).registerTempTable("expectedJsonTable")
+    read.json(filePath).registerTempTable("expectedJsonTable")
 
     checkAnswer(
       sql("SELECT a, b, `c_!@(3)`, `<d>`.`d!`, `<d>`.`=` FROM jsonTable"),
@@ -104,7 +104,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     assert(expectedSchema === table("jsonTable").schema)
 
-    jsonFile(filePath).registerTempTable("expectedJsonTable")
+    read.json(filePath).registerTempTable("expectedJsonTable")
 
     checkAnswer(
       sql("SELECT b, `<d>`.`=` FROM jsonTable"),
@@ -123,7 +123,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
-      jsonFile(filePath).collect().toSeq)
+      read.json(filePath).collect().toSeq)
   }
 
   test("drop table") {
@@ -138,7 +138,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
-      jsonFile(filePath).collect().toSeq)
+      read.json(filePath).collect().toSeq)
 
     sql("DROP TABLE jsonTable")
 
@@ -241,7 +241,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
         |)
       """.stripMargin)
 
-    jsonFile(filePath).registerTempTable("expectedJsonTable")
+    read.json(filePath).registerTempTable("expectedJsonTable")
 
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
@@ -474,7 +474,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     // Drop table will also delete the data.
     sql("DROP TABLE savedJsonTable")
     intercept[InvalidInputException] {
-      jsonFile(catalog.hiveDefaultTableFilePath("savedJsonTable"))
+      read.json(catalog.hiveDefaultTableFilePath("savedJsonTable"))
     }
 
     // Create an external table by specifying the path.
@@ -491,7 +491,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     // Data should not be deleted after we drop the table.
     sql("DROP TABLE savedJsonTable")
     checkAnswer(
-      jsonFile(tempPath.toString),
+      read.json(tempPath.toString),
       df.collect())
 
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
@@ -526,7 +526,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     // Data should not be deleted.
     sql("DROP TABLE createdJsonTable")
     checkAnswer(
-      jsonFile(tempPath.toString),
+      read.json(tempPath.toString),
       df.collect())
 
     // Try to specify the schema.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org