You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/05/17 07:01:57 UTC

[2/2] spark git commit: [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.

[SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.

Also moved all the deprecated functions into one place for SQLContext and DataFrame, and updated tests to use the new API.

Author: Reynold Xin <rx...@databricks.com>

Closes #6210 from rxin/df-writer-reader-jdbc and squashes the following commits:

7465c2c [Reynold Xin] Fixed unit test.
118e609 [Reynold Xin] Updated tests.
3441b57 [Reynold Xin] Updated javadoc.
13cdd1c [Reynold Xin] [SPARK-7654][SQL] Move JDBC into DataFrame's reader/writer interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/517eb37a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/517eb37a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/517eb37a

Branch: refs/heads/master
Commit: 517eb37a85e0a28820bcfd5d98c50d02df6521c6
Parents: 3b6ef2c
Author: Reynold Xin <rx...@databricks.com>
Authored: Sat May 16 22:01:53 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat May 16 22:01:53 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/examples/sql/JavaSparkSQL.java |   4 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  | 284 +++-----
 .../org/apache/spark/sql/DataFrameReader.scala  |  89 ++-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  53 +-
 .../scala/org/apache/spark/sql/SQLContext.scala | 682 ++++++++-----------
 .../org/apache/spark/sql/jdbc/JDBCRDD.scala     |  30 +-
 .../apache/spark/sql/jdbc/JDBCRelation.scala    |  16 +-
 .../org/apache/spark/sql/jdbc/JdbcUtils.scala   |  52 ++
 .../scala/org/apache/spark/sql/jdbc/jdbc.scala  |   6 +-
 .../apache/spark/sql/JavaApplySchemaSuite.java  |   4 +-
 .../spark/sql/sources/JavaSaveLoadSuite.java    |  10 +-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  31 +-
 .../apache/spark/sql/jdbc/JDBCWriteSuite.scala  |  54 +-
 .../sql/hive/JavaMetastoreDataSourcesSuite.java |  20 +-
 .../spark/sql/hive/CachedTableSuite.scala       |   4 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  73 +-
 .../hive/execution/HiveResolutionSuite.scala    |   6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |   8 +-
 .../apache/spark/sql/hive/parquetSuites.scala   |  14 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |  68 +-
 20 files changed, 747 insertions(+), 761 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
index 173633c..afee279 100644
--- a/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
+++ b/examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java
@@ -94,7 +94,7 @@ public class JavaSparkSQL {
 
     System.out.println("=== Data source: Parquet File ===");
     // DataFrames can be saved as parquet files, maintaining the schema information.
-    schemaPeople.saveAsParquetFile("people.parquet");
+    schemaPeople.write().parquet("people.parquet");
 
     // Read in the parquet file created above.
     // Parquet files are self-describing so the schema is preserved.
@@ -151,7 +151,7 @@ public class JavaSparkSQL {
     List<String> jsonData = Arrays.asList(
           "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
     JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
-    DataFrame peopleFromJsonRDD = sqlContext.jsonRDD(anotherPeopleRDD.rdd());
+    DataFrame peopleFromJsonRDD = sqlContext.read().json(anotherPeopleRDD.rdd());
 
     // Take a look at the schema of this new DataFrame.
     peopleFromJsonRDD.printSchema();

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 55ef357..27e9af4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql
 
 import java.io.CharArrayWriter
-import java.sql.DriverManager
 import java.util.Properties
 
 import scala.collection.JavaConversions._
@@ -40,9 +39,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{Filter, _}
 import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
 import org.apache.spark.sql.catalyst.{expressions, CatalystTypeConverters, ScalaReflection, SqlParser}
 import org.apache.spark.sql.execution.{EvaluatePython, ExplainCommand, LogicalRDD}
-import org.apache.spark.sql.jdbc.JDBCWriteDetails
 import org.apache.spark.sql.json.JacksonGenerator
-import org.apache.spark.sql.sources.{CreateTableUsingAsSelect, ResolvedDataSource}
+import org.apache.spark.sql.sources.CreateTableUsingAsSelect
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.Utils
@@ -227,10 +225,6 @@ class DataFrame private[sql](
     }
   }
 
-  /** Left here for backward compatibility. */
-  @deprecated("1.3.0", "use toDF")
-  def toSchemaRDD: DataFrame = this
-
   /**
    * Returns the object itself.
    * @group basic
@@ -1300,11 +1294,118 @@ class DataFrame private[sql](
   def write: DataFrameWriter = new DataFrameWriter(this)
 
   /**
+   * :: Experimental ::
+   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+   * @group output
+   * @since 1.3.0
+   */
+  @Experimental
+  def insertInto(tableName: String, overwrite: Boolean): Unit = {
+    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
+      Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
+  }
+
+  /**
+   * :: Experimental ::
+   * Adds the rows from this RDD to the specified table.
+   * Throws an exception if the table already exists.
+   * @group output
+   * @since 1.3.0
+   */
+  @Experimental
+  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
+
+  /**
+   * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
+   * @group rdd
+   * @since 1.3.0
+   */
+  def toJSON: RDD[String] = {
+    val rowSchema = this.schema
+    this.mapPartitions { iter =>
+      val writer = new CharArrayWriter()
+      // create the Generator without separator inserted between 2 records
+      val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
+
+      new Iterator[String] {
+        override def hasNext: Boolean = iter.hasNext
+        override def next(): String = {
+          JacksonGenerator(rowSchema, gen)(iter.next())
+          gen.flush()
+
+          val json = writer.toString
+          if (hasNext) {
+            writer.reset()
+          } else {
+            gen.close()
+          }
+
+          json
+        }
+      }
+    }
+  }
+
+  ////////////////////////////////////////////////////////////////////////////
+  // for Python API
+  ////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Converts a JavaRDD to a PythonRDD.
+   */
+  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
+    val fieldTypes = schema.fields.map(_.dataType)
+    val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
+    SerDeUtil.javaToPython(jrdd)
+  }
+
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+  // Deprecated methods
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+
+  /** Left here for backward compatibility. */
+  @deprecated("use toDF", "1.3.0")
+  def toSchemaRDD: DataFrame = this
+
+  /**
+   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
+   * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
+   * If you pass `true` for `allowExisting`, it will drop any table with the
+   * given name; if you pass `false`, it will throw if the table already
+   * exists.
+   * @group output
+   */
+  @deprecated("Use write.jdbc()", "1.4.0")
+  def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
+    val w = if (allowExisting) write.mode(SaveMode.Overwrite) else write
+    w.jdbc(url, table, new Properties)
+  }
+
+  /**
+   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
+   * Assumes the table already exists and has a compatible schema.  If you
+   * pass `true` for `overwrite`, it will `TRUNCATE` the table before
+   * performing the `INSERT`s.
+   *
+   * The table must already exist on the database.  It must have a schema
+   * that is compatible with the schema of this RDD; inserting the rows of
+   * the RDD in order via the simple statement
+   * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
+   * @group output
+   */
+  @deprecated("Use write.jdbc()", "1.4.0")
+  def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
+    val w = if (overwrite) write.mode(SaveMode.Overwrite) else write
+    w.jdbc(url, table, new Properties)
+  }
+
+  /**
    * Saves the contents of this [[DataFrame]] as a parquet file, preserving the schema.
    * Files that are written out using this method can be read back in as a [[DataFrame]]
    * using the `parquetFile` function in [[SQLContext]].
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.parquet(path)", "1.4.0")
   def saveAsParquetFile(path: String): Unit = {
@@ -1328,7 +1429,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String): Unit = {
@@ -1347,7 +1447,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, mode: SaveMode): Unit = {
@@ -1373,7 +1472,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, source: String): Unit = {
@@ -1393,7 +1491,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, source: String, mode: SaveMode): Unit = {
@@ -1412,7 +1509,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
     "1.4.0")
@@ -1437,7 +1533,6 @@ class DataFrame private[sql](
    * Also note that while this function can persist the table metadata into Hive's metastore,
    * the table will NOT be accessible from Hive, until SPARK-7550 is resolved.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).options(options).saveAsTable(tableName)",
     "1.4.0")
@@ -1454,7 +1549,6 @@ class DataFrame private[sql](
    * using the default data source configured by spark.sql.sources.default and
    * [[SaveMode.ErrorIfExists]] as the save mode.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.save(path)", "1.4.0")
   def save(path: String): Unit = {
@@ -1465,7 +1559,6 @@ class DataFrame private[sql](
    * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
    * using the default data source configured by spark.sql.sources.default.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.mode(mode).save(path)", "1.4.0")
   def save(path: String, mode: SaveMode): Unit = {
@@ -1476,7 +1569,6 @@ class DataFrame private[sql](
    * Saves the contents of this DataFrame to the given path based on the given data source,
    * using [[SaveMode.ErrorIfExists]] as the save mode.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).save(path)", "1.4.0")
   def save(path: String, source: String): Unit = {
@@ -1487,7 +1579,6 @@ class DataFrame private[sql](
    * Saves the contents of this DataFrame to the given path based on the given data source and
    * [[SaveMode]] specified by mode.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).save(path)", "1.4.0")
   def save(path: String, source: String, mode: SaveMode): Unit = {
@@ -1498,7 +1589,6 @@ class DataFrame private[sql](
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options.
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
   def save(
@@ -1513,7 +1603,6 @@ class DataFrame private[sql](
    * Saves the contents of this DataFrame based on the given data source,
    * [[SaveMode]] specified by mode, and a set of options
    * @group output
-   * @since 1.3.0
    */
   @deprecated("Use write.format(source).mode(mode).options(options).save()", "1.4.0")
   def save(
@@ -1523,163 +1612,10 @@ class DataFrame private[sql](
     write.format(source).mode(mode).options(options).save()
   }
 
-  /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
-   * @group output
-   * @since 1.3.0
-   */
-  @Experimental
-  def insertInto(tableName: String, overwrite: Boolean): Unit = {
-    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
-      Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
-  }
-
-  /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table.
-   * Throws an exception if the table already exists.
-   * @group output
-   * @since 1.3.0
-   */
-  @Experimental
-  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
-  /**
-   * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
-   * @group rdd
-   * @since 1.3.0
-   */
-  def toJSON: RDD[String] = {
-    val rowSchema = this.schema
-    this.mapPartitions { iter =>
-      val writer = new CharArrayWriter()
-      // create the Generator without separator inserted between 2 records
-      val gen = new JsonFactory().createGenerator(writer).setRootValueSeparator(null)
-
-      new Iterator[String] {
-        override def hasNext: Boolean = iter.hasNext
-        override def next(): String = {
-          JacksonGenerator(rowSchema, gen)(iter.next())
-          gen.flush()
-
-          val json = writer.toString
-          if (hasNext) {
-            writer.reset()
-          } else {
-            gen.close()
-          }
-
-          json
-        }
-      }
-    }
-  }
-
   ////////////////////////////////////////////////////////////////////////////
-  // JDBC Write Support
   ////////////////////////////////////////////////////////////////////////////
-
-  /**
-   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
-   * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
-   * If you pass `true` for `allowExisting`, it will drop any table with the
-   * given name; if you pass `false`, it will throw if the table already
-   * exists.
-   * @group output
-   * @since 1.3.0
-   */
-  def createJDBCTable(url: String, table: String, allowExisting: Boolean): Unit = {
-    createJDBCTable(url, table, allowExisting, new Properties())
-  }
-    
-  /**
-   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
-   * using connection properties defined in `properties`.
-   * This will run a `CREATE TABLE` and a bunch of `INSERT INTO` statements.
-   * If you pass `true` for `allowExisting`, it will drop any table with the
-   * given name; if you pass `false`, it will throw if the table already
-   * exists.
-   * @group output
-   * @since 1.4.0
-   */
-  def createJDBCTable(
-      url: String,
-      table: String,
-      allowExisting: Boolean,
-      properties: Properties): Unit = {
-    val conn = DriverManager.getConnection(url, properties)
-    try {
-      if (allowExisting) {
-        val sql = s"DROP TABLE IF EXISTS $table"
-        conn.prepareStatement(sql).executeUpdate()
-      }
-      val schema = JDBCWriteDetails.schemaString(this, url)
-      val sql = s"CREATE TABLE $table ($schema)"
-      conn.prepareStatement(sql).executeUpdate()
-    } finally {
-      conn.close()
-    }
-    JDBCWriteDetails.saveTable(this, url, table, properties)
-  }
-
-  /**
-   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`.
-   * Assumes the table already exists and has a compatible schema.  If you
-   * pass `true` for `overwrite`, it will `TRUNCATE` the table before
-   * performing the `INSERT`s.
-   *
-   * The table must already exist on the database.  It must have a schema
-   * that is compatible with the schema of this RDD; inserting the rows of
-   * the RDD in order via the simple statement
-   * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
-   * @group output
-   * @since 1.3.0
-   */
-  def insertIntoJDBC(url: String, table: String, overwrite: Boolean): Unit = {
-    insertIntoJDBC(url, table, overwrite, new Properties())
-  }
-
-  /**
-   * Save this [[DataFrame]] to a JDBC database at `url` under the table name `table`
-   * using connection properties defined in `properties`.
-   * Assumes the table already exists and has a compatible schema.  If you
-   * pass `true` for `overwrite`, it will `TRUNCATE` the table before
-   * performing the `INSERT`s.
-   *
-   * The table must already exist on the database.  It must have a schema
-   * that is compatible with the schema of this RDD; inserting the rows of
-   * the RDD in order via the simple statement
-   * `INSERT INTO table VALUES (?, ?, ..., ?)` should not fail.
-   * @group output
-   * @since 1.4.0
-   */
-  def insertIntoJDBC(
-      url: String,
-      table: String,
-      overwrite: Boolean,
-      properties: Properties): Unit = {
-    if (overwrite) {
-      val conn = DriverManager.getConnection(url, properties)
-      try {
-        val sql = s"TRUNCATE TABLE $table"
-        conn.prepareStatement(sql).executeUpdate()
-      } finally {
-        conn.close()
-      }
-    }
-    JDBCWriteDetails.saveTable(this, url, table, properties)
-  }
+  // End of eeprecated methods
   ////////////////////////////////////////////////////////////////////////////
-  // for Python API
   ////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * Converts a JavaRDD to a PythonRDD.
-   */
-  protected[sql] def javaToPython: JavaRDD[Array[Byte]] = {
-    val fieldTypes = schema.fields.map(_.dataType)
-    val jrdd = rdd.map(EvaluatePython.rowToArray(_, fieldTypes)).toJavaRDD()
-    SerDeUtil.javaToPython(jrdd)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 4d63faa..381c10f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -17,12 +17,16 @@
 
 package org.apache.spark.sql
 
+import java.util.Properties
+
 import org.apache.hadoop.fs.Path
+import org.apache.spark.Partition
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
 import org.apache.spark.sql.parquet.ParquetRelation2
 import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
@@ -31,7 +35,7 @@ import org.apache.spark.sql.types.StructType
 /**
  * :: Experimental ::
  * Interface used to load a [[DataFrame]] from external storage systems (e.g. file systems,
- * key-value stores, etc).
+ * key-value stores, etc). Use [[SQLContext.read]] to access this.
  *
  * @since 1.4.0
  */
@@ -94,6 +98,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
    * Specifies the input partitioning. If specified, the underlying data source does not need to
    * discover the data partitioning scheme, and thus can speed up very large inputs.
    *
+   * This is only applicable for Parquet at the moment.
+   *
    * @since 1.4.0
    */
   @scala.annotation.varargs
@@ -129,6 +135,87 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
   }
 
   /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table and connection properties.
+   *
+   * @since 1.4.0
+   */
+  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
+    jdbc(url, table, JDBCRelation.columnPartition(null), properties)
+  }
+
+  /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table. Partitions of the table will be retrieved in parallel based on the parameters
+   * passed to this function.
+   *
+   * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+   * your external database systems.
+   *
+   * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+   * @param table Name of the table in the external database.
+   * @param columnName the name of a column of integral type that will be used for partitioning.
+   * @param lowerBound the minimum value of `columnName` used to decide partition stride
+   * @param upperBound the maximum value of `columnName` used to decide partition stride
+   * @param numPartitions the number of partitions.  the range `minValue`-`maxValue` will be split
+   *                      evenly into this many partitions
+   * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+   *                             tag/value. Normally at least a "user" and "password" property
+   *                             should be included.
+   *
+   * @since 1.4.0
+   */
+  def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int,
+      connectionProperties: Properties): DataFrame = {
+    val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
+    val parts = JDBCRelation.columnPartition(partitioning)
+    jdbc(url, table, parts, connectionProperties)
+  }
+
+  /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table using connection properties. The `predicates` parameter gives a list
+   * expressions suitable for inclusion in WHERE clauses; each one defines one partition
+   * of the [[DataFrame]].
+   *
+   * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+   * your external database systems.
+   *
+   * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+   * @param table Name of the table in the external database.
+   * @param predicates Condition in the where clause for each partition.
+   * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+   *                             tag/value. Normally at least a "user" and "password" property
+   *                             should be included.
+   * @since 1.4.0
+   */
+  def jdbc(
+      url: String,
+      table: String,
+      predicates: Array[String],
+      connectionProperties: Properties): DataFrame = {
+    val parts: Array[Partition] = predicates.zipWithIndex.map { case (part, i) =>
+      JDBCPartition(part, i) : Partition
+    }
+    jdbc(url, table, parts, connectionProperties)
+  }
+
+  private def jdbc(
+      url: String,
+      table: String,
+      parts: Array[Partition],
+      connectionProperties: Properties): DataFrame = {
+    val relation = JDBCRelation(url, table, parts, connectionProperties)(sqlContext)
+    sqlContext.baseRelationToDataFrame(relation)
+  }
+
+  /**
    * Loads a JSON file (one object per line) and returns the result as a [[DataFrame]].
    *
    * This function goes through the input once to determine the input schema. If you know the

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9f42f0f..f2e721d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -17,14 +17,17 @@
 
 package org.apache.spark.sql
 
+import java.util.Properties
+
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
 import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
 
 
 /**
  * :: Experimental ::
  * Interface used to write a [[DataFrame]] to external storage systems (e.g. file systems,
- * key-value stores, etc).
+ * key-value stores, etc). Use [[DataFrame.write]] to access this.
  *
  * @since 1.4.0
  */
@@ -110,6 +113,8 @@ final class DataFrameWriter private[sql](df: DataFrame) {
    * Partitions the output by the given columns on the file system. If specified, the output is
    * laid out on the file system similar to Hive's partitioning scheme.
    *
+   * This is only applicable for Parquet at the moment.
+   *
    * @since 1.4.0
    */
   @scala.annotation.varargs
@@ -162,6 +167,52 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   /**
+   * Saves the content of the [[DataFrame]] to a external database table via JDBC. In the case the
+   * table already exists in the external database, behavior of this function depends on the
+   * save mode, specified by the `mode` function (default to throwing an exception).
+   *
+   * Don't create too many partitions in parallel on a large cluster; otherwise Spark might crash
+   * your external database systems.
+   *
+   * @param url JDBC database url of the form `jdbc:subprotocol:subname`
+   * @param table Name of the table in the external database.
+   * @param connectionProperties JDBC database connection arguments, a list of arbitrary string
+   *                             tag/value. Normally at least a "user" and "password" property
+   *                             should be included.
+   */
+  def jdbc(url: String, table: String, connectionProperties: Properties): Unit = {
+    val conn = JdbcUtils.createConnection(url, connectionProperties)
+
+    try {
+      var tableExists = JdbcUtils.tableExists(conn, table)
+
+      if (mode == SaveMode.Ignore && tableExists) {
+        return
+      }
+
+      if (mode == SaveMode.ErrorIfExists && tableExists) {
+        sys.error(s"Table $table already exists.")
+      }
+
+      if (mode == SaveMode.Overwrite && tableExists) {
+        JdbcUtils.dropTable(conn, table)
+        tableExists = false
+      }
+
+      // Create the table if the table didn't exist.
+      if (!tableExists) {
+        val schema = JDBCWriteDetails.schemaString(df, url)
+        val sql = s"CREATE TABLE $table ($schema)"
+        conn.prepareStatement(sql).executeUpdate()
+      }
+    } finally {
+      conn.close()
+    }
+
+    JDBCWriteDetails.saveTable(df, url, table, connectionProperties)
+  }
+
+  /**
    * Saves the content of the [[DataFrame]] in JSON format at the specified path.
    * This is equivalent to:
    * {{{

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 34a50e5..ac1a800 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -28,6 +28,7 @@ import scala.util.control.NonFatal
 
 import com.google.common.reflect.TypeToken
 
+import org.apache.spark.SparkContext
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.rdd.RDD
@@ -40,11 +41,9 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.ParserDialect
 import org.apache.spark.sql.execution.{Filter, _}
-import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
-import org.apache.spark.{Partition, SparkContext}
 
 /**
  * The entry point for working with structured data (rows and columns) in Spark.  Allows the
@@ -532,67 +531,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
-   * :: DeveloperApi ::
-   * Creates a [[DataFrame]] from an [[RDD]] containing [[Row]]s by applying a schema to this RDD.
-   * It is important to make sure that the structure of every [[Row]] of the provided RDD matches
-   * the provided schema. Otherwise, there will be runtime exception.
-   * Example:
-   * {{{
-   *  import org.apache.spark.sql._
-   *  import org.apache.spark.sql.types._
-   *  val sqlContext = new org.apache.spark.sql.SQLContext(sc)
-   *
-   *  val schema =
-   *    StructType(
-   *      StructField("name", StringType, false) ::
-   *      StructField("age", IntegerType, true) :: Nil)
-   *
-   *  val people =
-   *    sc.textFile("examples/src/main/resources/people.txt").map(
-   *      _.split(",")).map(p => Row(p(0), p(1).trim.toInt))
-   *  val dataFrame = sqlContext. applySchema(people, schema)
-   *  dataFrame.printSchema
-   *  // root
-   *  // |-- name: string (nullable = false)
-   *  // |-- age: integer (nullable = true)
-   *
-   *  dataFrame.registerTempTable("people")
-   *  sqlContext.sql("select name from people").collect.foreach(println)
-   * }}}
-   */
-  @deprecated("use createDataFrame", "1.3.0")
-  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
-    createDataFrame(rowRDD, schema)
-  }
-
-  @deprecated("use createDataFrame", "1.3.0")
-  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
-    createDataFrame(rowRDD, schema)
-  }
-
-  /**
-   * Applies a schema to an RDD of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   */
-  @deprecated("use createDataFrame", "1.3.0")
-  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
-    createDataFrame(rdd, beanClass)
-  }
-
-  /**
-   * Applies a schema to an RDD of Java Beans.
-   *
-   * WARNING: Since there is no guaranteed ordering for fields in a Java Bean,
-   *          SELECT * queries will return the columns in an undefined order.
-   */
-  @deprecated("use createDataFrame", "1.3.0")
-  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
-    createDataFrame(rdd, beanClass)
-  }
-
-  /**
    * :: Experimental ::
    * Returns a [[DataFrameReader]] that can be used to read data in as a [[DataFrame]].
    * {{{
@@ -607,205 +545,6 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def read: DataFrameReader = new DataFrameReader(this)
 
   /**
-   * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
-   * [[DataFrame]] if no paths are passed in.
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.parquet()", "1.4.0")
-  @scala.annotation.varargs
-  def parquetFile(paths: String*): DataFrame = {
-    if (paths.isEmpty) {
-      emptyDataFrame
-    } else if (conf.parquetUseDataSourceApi) {
-      read.parquet(paths : _*)
-    } else {
-      DataFrame(this, parquet.ParquetRelation(
-        paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
-    }
-  }
-
-  /**
-   * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonFile(path: String): DataFrame = {
-    read.json(path)
-  }
-
-  /**
-   * Loads a JSON file (one object per line) and applies the given schema,
-   * returning the result as a [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonFile(path: String, schema: StructType): DataFrame = {
-    read.schema(schema).json(path)
-  }
-
-  /**
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(path)
-  }
-
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
-   * [[DataFrame]].
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
-
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
-   * [[DataFrame]].
-   * It goes through the entire dataset once to determine the schema.
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
-
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
-   * returning the result as a [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
-    read.schema(schema).json(json)
-  }
-
-  /**
-   * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
-   * schema, returning the result as a [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
-    read.schema(schema).json(json)
-  }
-
-  /**
-   * Loads an RDD[String] storing JSON objects (one object per record) inferring the
-   * schema, returning the result as a [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(json)
-  }
-
-  /**
-   * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
-   * schema, returning the result as a [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.json()", "1.4.0")
-  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
-    read.option("samplingRatio", samplingRatio.toString).json(json)
-  }
-
-  /**
-   * Returns the dataset stored at path as a DataFrame,
-   * using the default data source configured by spark.sql.sources.default.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.load(path)", "1.4.0")
-  def load(path: String): DataFrame = {
-    read.load(path)
-  }
-
-  /**
-   * Returns the dataset stored at path as a DataFrame, using the given data source.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.format(source).load(path)", "1.4.0")
-  def load(path: String, source: String): DataFrame = {
-    read.format(source).load(path)
-  }
-
-  /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
-  def load(source: String, options: java.util.Map[String, String]): DataFrame = {
-    read.options(options).format(source).load()
-  }
-
-  /**
-   * (Scala-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
-  def load(source: String, options: Map[String, String]): DataFrame = {
-    read.options(options).format(source).load()
-  }
-
-  /**
-   * (Java-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
-   *
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
-  def load(
-      source: String,
-      schema: StructType,
-      options: java.util.Map[String, String]): DataFrame = {
-    read.format(source).schema(schema).options(options).load()
-  }
-
-  /**
-   * (Scala-specific) Returns the dataset specified by the given data source and
-   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
-   * @group genericdata
-   * @since 1.3.0
-   */
-  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
-  def load(
-      source: String,
-      schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    read.format(source).schema(schema).options(options).load()
-  }
-
-  /**
    * :: Experimental ::
    * Creates an external table from the given path and returns the corresponding DataFrame.
    * It will use the default data source configured by spark.sql.sources.default.
@@ -903,150 +642,24 @@ class SQLContext(@transient val sparkContext: SparkContext)
    *
    * @group ddl_ops
    * @since 1.3.0
-   */
-  @Experimental
-  def createExternalTable(
-      tableName: String,
-      source: String,
-      schema: StructType,
-      options: Map[String, String]): DataFrame = {
-    val cmd =
-      CreateTableUsing(
-        tableName,
-        userSpecifiedSchema = Some(schema),
-        source,
-        temporary = false,
-        options,
-        allowExisting = false,
-        managedIfNoPath = false)
-    executePlan(cmd).toRdd
-    table(tableName)
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table.
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @Experimental
-  def jdbc(url: String, table: String): DataFrame = {
-    jdbc(url, table, JDBCRelation.columnPartition(null), new Properties())
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table and connection properties.
-   *
-   * @group specificdata
-   * @since 1.4.0
-   */
-  @Experimental
-  def jdbc(url: String, table: String, properties: Properties): DataFrame = {
-    jdbc(url, table, JDBCRelation.columnPartition(null), properties)
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table.  Partitions of the table will be retrieved in parallel based on the parameters
-   * passed to this function.
-   *
-   * @param columnName the name of a column of integral type that will be used for partitioning.
-   * @param lowerBound the minimum value of `columnName` used to decide partition stride
-   * @param upperBound the maximum value of `columnName` used to decide partition stride
-   * @param numPartitions the number of partitions.  the range `minValue`-`maxValue` will be split
-   *                      evenly into this many partitions
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @Experimental
-  def jdbc(
-      url: String,
-      table: String,
-      columnName: String,
-      lowerBound: Long,
-      upperBound: Long,
-      numPartitions: Int): DataFrame = {
-    jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties())
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table.  Partitions of the table will be retrieved in parallel based on the parameters
-   * passed to this function.
-   *
-   * @param columnName the name of a column of integral type that will be used for partitioning.
-   * @param lowerBound the minimum value of `columnName` used to decide partition stride
-   * @param upperBound the maximum value of `columnName` used to decide partition stride
-   * @param numPartitions the number of partitions.  the range `minValue`-`maxValue` will be split
-   *                      evenly into this many partitions
-   * @param properties connection properties
-   * @group specificdata
-   * @since 1.4.0
-   */
-  @Experimental
-  def jdbc(
-      url: String,
-      table: String,
-      columnName: String,
-      lowerBound: Long,
-      upperBound: Long,
-      numPartitions: Int,
-      properties: Properties): DataFrame = {
-    val partitioning = JDBCPartitioningInfo(columnName, lowerBound, upperBound, numPartitions)
-    val parts = JDBCRelation.columnPartition(partitioning)
-    jdbc(url, table, parts, properties)
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table. The theParts parameter gives a list expressions
-   * suitable for inclusion in WHERE clauses; each one defines one partition
-   * of the [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.3.0
-   */
-  @Experimental
-  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
-    jdbc(url, table, theParts, new Properties())
-  }
-
-  /**
-   * :: Experimental ::
-   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
-   * url named table using connection properties. The theParts parameter gives a list expressions
-   * suitable for inclusion in WHERE clauses; each one defines one partition
-   * of the [[DataFrame]].
-   *
-   * @group specificdata
-   * @since 1.4.0
-   */
-  @Experimental
-  def jdbc(
-      url: String,
-      table: String,
-      theParts: Array[String],
-      properties: Properties): DataFrame = {
-    val parts: Array[Partition] = theParts.zipWithIndex.map { case (part, i) =>
-      JDBCPartition(part, i) : Partition
-    }
-    jdbc(url, table, parts, properties)
-  }
-
-  private def jdbc(
-      url: String,
-      table: String,
-      parts: Array[Partition],
-      properties: Properties): DataFrame = {
-    val relation = JDBCRelation(url, table, parts, properties)(this)
-    baseRelationToDataFrame(relation)
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    val cmd =
+      CreateTableUsing(
+        tableName,
+        userSpecifiedSchema = Some(schema),
+        source,
+        temporary = false,
+        options,
+        allowExisting = false,
+        managedIfNoPath = false)
+    executePlan(cmd).toRdd
+    table(tableName)
   }
 
   /**
@@ -1372,6 +985,263 @@ class SQLContext(@transient val sparkContext: SparkContext)
     }
   }
 
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+  // Deprecated methods
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+
+  @deprecated("use createDataFrame", "1.3.0")
+  def applySchema(rowRDD: RDD[Row], schema: StructType): DataFrame = {
+    createDataFrame(rowRDD, schema)
+  }
+
+  @deprecated("use createDataFrame", "1.3.0")
+  def applySchema(rowRDD: JavaRDD[Row], schema: StructType): DataFrame = {
+    createDataFrame(rowRDD, schema)
+  }
+
+  @deprecated("use createDataFrame", "1.3.0")
+  def applySchema(rdd: RDD[_], beanClass: Class[_]): DataFrame = {
+    createDataFrame(rdd, beanClass)
+  }
+
+  @deprecated("use createDataFrame", "1.3.0")
+  def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): DataFrame = {
+    createDataFrame(rdd, beanClass)
+  }
+
+  /**
+   * Loads a Parquet file, returning the result as a [[DataFrame]]. This function returns an empty
+   * [[DataFrame]] if no paths are passed in.
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.parquet()", "1.4.0")
+  @scala.annotation.varargs
+  def parquetFile(paths: String*): DataFrame = {
+    if (paths.isEmpty) {
+      emptyDataFrame
+    } else if (conf.parquetUseDataSourceApi) {
+      read.parquet(paths : _*)
+    } else {
+      DataFrame(this, parquet.ParquetRelation(
+        paths.mkString(","), Some(sparkContext.hadoopConfiguration), this))
+    }
+  }
+
+  /**
+   * Loads a JSON file (one object per line), returning the result as a [[DataFrame]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String): DataFrame = {
+    read.json(path)
+  }
+
+  /**
+   * Loads a JSON file (one object per line) and applies the given schema,
+   * returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String, schema: StructType): DataFrame = {
+    read.schema(schema).json(path)
+  }
+
+  /**
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonFile(path: String, samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(path)
+  }
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+   * [[DataFrame]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: RDD[String]): DataFrame = read.json(json)
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record), returning the result as a
+   * [[DataFrame]].
+   * It goes through the entire dataset once to determine the schema.
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: JavaRDD[String]): DataFrame = read.json(json)
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record) and applies the given schema,
+   * returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: RDD[String], schema: StructType): DataFrame = {
+    read.schema(schema).json(json)
+  }
+
+  /**
+   * Loads an JavaRDD<String> storing JSON objects (one object per record) and applies the given
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: JavaRDD[String], schema: StructType): DataFrame = {
+    read.schema(schema).json(json)
+  }
+
+  /**
+   * Loads an RDD[String] storing JSON objects (one object per record) inferring the
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: RDD[String], samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(json)
+  }
+
+  /**
+   * Loads a JavaRDD[String] storing JSON objects (one object per record) inferring the
+   * schema, returning the result as a [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("Use read.json()", "1.4.0")
+  def jsonRDD(json: JavaRDD[String], samplingRatio: Double): DataFrame = {
+    read.option("samplingRatio", samplingRatio.toString).json(json)
+  }
+
+  /**
+   * Returns the dataset stored at path as a DataFrame,
+   * using the default data source configured by spark.sql.sources.default.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.load(path)", "1.4.0")
+  def load(path: String): DataFrame = {
+    read.load(path)
+  }
+
+  /**
+   * Returns the dataset stored at path as a DataFrame, using the given data source.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.format(source).load(path)", "1.4.0")
+  def load(path: String, source: String): DataFrame = {
+    read.format(source).load(path)
+  }
+
+  /**
+   * (Java-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
+  def load(source: String, options: java.util.Map[String, String]): DataFrame = {
+    read.options(options).format(source).load()
+  }
+
+  /**
+   * (Scala-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.format(source).options(options).load()", "1.4.0")
+  def load(source: String, options: Map[String, String]): DataFrame = {
+    read.options(options).format(source).load()
+  }
+
+  /**
+   * (Java-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
+  def load(source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame =
+  {
+    read.format(source).schema(schema).options(options).load()
+  }
+
+  /**
+   * (Scala-specific) Returns the dataset specified by the given data source and
+   * a set of options as a DataFrame, using the given schema as the schema of the DataFrame.
+   *
+   * @group genericdata
+   */
+  @deprecated("Use read.format(source).schema(schema).options(options).load()", "1.4.0")
+  def load(source: String, schema: StructType, options: Map[String, String]): DataFrame = {
+    read.format(source).schema(schema).options(options).load()
+  }
+
+  /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table.
+   *
+   * @group specificdata
+   */
+  @deprecated("use read.jdbc()", "1.4.0")
+  def jdbc(url: String, table: String): DataFrame = {
+    read.jdbc(url, table, new Properties)
+  }
+
+  /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table.  Partitions of the table will be retrieved in parallel based on the parameters
+   * passed to this function.
+   *
+   * @param columnName the name of a column of integral type that will be used for partitioning.
+   * @param lowerBound the minimum value of `columnName` used to decide partition stride
+   * @param upperBound the maximum value of `columnName` used to decide partition stride
+   * @param numPartitions the number of partitions.  the range `minValue`-`maxValue` will be split
+   *                      evenly into this many partitions
+   * @group specificdata
+   */
+  @deprecated("use read.jdbc()", "1.4.0")
+  def jdbc(
+      url: String,
+      table: String,
+      columnName: String,
+      lowerBound: Long,
+      upperBound: Long,
+      numPartitions: Int): DataFrame = {
+    read.jdbc(url, table, columnName, lowerBound, upperBound, numPartitions, new Properties)
+  }
+
+  /**
+   * Construct a [[DataFrame]] representing the database table accessible via JDBC URL
+   * url named table. The theParts parameter gives a list expressions
+   * suitable for inclusion in WHERE clauses; each one defines one partition
+   * of the [[DataFrame]].
+   *
+   * @group specificdata
+   */
+  @deprecated("use read.jdbc()", "1.4.0")
+  def jdbc(url: String, table: String, theParts: Array[String]): DataFrame = {
+    read.jdbc(url, table, theParts, new Properties)
+  }
+
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
+  // End of eeprecated methods
+  ////////////////////////////////////////////////////////////////////////////
+  ////////////////////////////////////////////////////////////////////////////
 }
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
index 40483d3..95935ba 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRDD.scala
@@ -29,7 +29,16 @@ import org.apache.spark.sql.catalyst.util.DateUtils
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.sources._
 
+/**
+ * Data corresponding to one partition of a JDBCRDD.
+ */
+private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
+  override def index: Int = idx
+}
+
+
 private[sql] object JDBCRDD extends Logging {
+
   /**
    * Maps a JDBC type to a Catalyst type.  This function is called only when
    * the DriverQuirks class corresponding to your database driver returns null.
@@ -168,6 +177,7 @@ private[sql] object JDBCRDD extends Logging {
       DriverManager.getConnection(url, properties)
     }
   }
+
   /**
    * Build and return JDBCRDD from the given information.
    *
@@ -193,18 +203,14 @@ private[sql] object JDBCRDD extends Logging {
       requiredColumns: Array[String],
       filters: Array[Filter],
       parts: Array[Partition]): RDD[Row] = {
-
-    val prunedSchema = pruneSchema(schema, requiredColumns)
-
-    return new
-        JDBCRDD(
-          sc,
-          getConnector(driver, url, properties),
-          prunedSchema,
-          fqTable,
-          requiredColumns,
-          filters,
-          parts)
+    new JDBCRDD(
+      sc,
+      getConnector(driver, url, properties),
+      pruneSchema(schema, requiredColumns),
+      fqTable,
+      requiredColumns,
+      filters,
+      parts)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
index 93e8254..09d6865 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JDBCRelation.scala
@@ -17,26 +17,16 @@
 
 package org.apache.spark.sql.jdbc
 
-import java.sql.DriverManager
 import java.util.Properties
 
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.Partition
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.Row
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.Utils
-
-/**
- * Data corresponding to one partition of a JDBCRDD.
- */
-private[sql] case class JDBCPartition(whereClause: String, idx: Int) extends Partition {
-  override def index: Int = idx
-}
 
 /**
  * Instructions on how to partition the table among workers.
@@ -152,6 +142,8 @@ private[sql] case class JDBCRelation(
   }
   
   override def insert(data: DataFrame, overwrite: Boolean): Unit = {
-    data.insertIntoJDBC(url, table, overwrite, properties)
+    data.write
+      .mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append)
+      .jdbc(url, table, properties)
   }  
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
new file mode 100644
index 0000000..cc918c2
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcUtils.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.jdbc
+
+import java.sql.{Connection, DriverManager}
+import java.util.Properties
+
+import scala.util.Try
+
+/**
+ * Util functions for JDBC tables.
+ */
+private[sql] object JdbcUtils {
+
+  /**
+   * Establishes a JDBC connection.
+   */
+  def createConnection(url: String, connectionProperties: Properties): Connection = {
+    DriverManager.getConnection(url, connectionProperties)
+  }
+
+  /**
+   * Returns true if the table already exists in the JDBC database.
+   */
+  def tableExists(conn: Connection, table: String): Boolean = {
+    // Somewhat hacky, but there isn't a good way to identify whether a table exists for all
+    // SQL database systems, considering "table" could also include the database name.
+    Try(conn.prepareStatement(s"SELECT 1 FROM $table LIMIT 1").executeQuery().next()).isSuccess
+  }
+
+  /**
+   * Drops a table from the JDBC database.
+   */
+  def dropTable(conn: Connection, table: String): Unit = {
+    conn.prepareStatement(s"DROP TABLE $table").executeUpdate()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
index c099881..a61790b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/jdbc.scala
@@ -163,8 +163,8 @@ package object jdbc {
         table: String,
         properties: Properties = new Properties()) {
       val quirks = DriverQuirks.get(url)
-      var nullTypes: Array[Int] = df.schema.fields.map(field => {
-        var nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
+      val nullTypes: Array[Int] = df.schema.fields.map { field =>
+        val nullType: Option[Int] = quirks.getJDBCType(field.dataType)._2
         if (nullType.isEmpty) {
           field.dataType match {
             case IntegerType => java.sql.Types.INTEGER
@@ -183,7 +183,7 @@ package object jdbc {
               s"Can't translate null value for field $field")
           }
         } else nullType.get
-      }).toArray
+      }
 
       val rddSchema = df.schema
       df.foreachPartition { iterator =>

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
index c344a9b..fcb8f54 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaApplySchemaSuite.java
@@ -187,14 +187,14 @@ public class JavaApplySchemaSuite implements Serializable {
         null,
         "this is another simple string."));
 
-    DataFrame df1 = sqlContext.jsonRDD(jsonRDD);
+    DataFrame df1 = sqlContext.read().json(jsonRDD);
     StructType actualSchema1 = df1.schema();
     Assert.assertEquals(expectedSchema, actualSchema1);
     df1.registerTempTable("jsonTable1");
     List<Row> actual1 = sqlContext.sql("select * from jsonTable1").collectAsList();
     Assert.assertEquals(expectedResult, actual1);
 
-    DataFrame df2 = sqlContext.jsonRDD(jsonRDD, expectedSchema);
+    DataFrame df2 = sqlContext.read().schema(expectedSchema).json(jsonRDD);
     StructType actualSchema2 = df2.schema();
     Assert.assertEquals(expectedSchema, actualSchema2);
     df2.registerTempTable("jsonTable2");

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
index 6a0bcef..2706e01 100644
--- a/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
+++ b/sql/core/src/test/java/test/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -67,7 +67,7 @@ public class JavaSaveLoadSuite {
       jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
     }
     JavaRDD<String> rdd = sc.parallelize(jsonObjects);
-    df = sqlContext.jsonRDD(rdd);
+    df = sqlContext.read().json(rdd);
     df.registerTempTable("jsonTable");
   }
 
@@ -75,10 +75,8 @@ public class JavaSaveLoadSuite {
   public void saveAndLoad() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.save("json", SaveMode.ErrorIfExists, options);
-
+    df.write().mode(SaveMode.ErrorIfExists).format("json").options(options).save();
     DataFrame loadedDF = sqlContext.read().format("json").options(options).load();
-
     checkAnswer(loadedDF, df.collectAsList());
   }
 
@@ -86,12 +84,12 @@ public class JavaSaveLoadSuite {
   public void saveAndLoadWithSchema() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.save("json", SaveMode.ErrorIfExists, options);
+    df.write().format("json").mode(SaveMode.ErrorIfExists).options(options).save();
 
     List<StructField> fields = new ArrayList<StructField>();
     fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
     StructType schema = DataTypes.createStructType(fields);
-    DataFrame loadedDF = sqlContext.load("json", schema, options);
+    DataFrame loadedDF = sqlContext.read().format("json").schema(schema).options(options).load();
 
     checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 2abfe7f..5a7b6f0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -221,22 +221,25 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("Basic API") {
-    assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE").collect().size === 3)
+    assert(TestSQLContext.read.jdbc(
+      urlWithUserAndPass, "TEST.PEOPLE", new Properties).collect().length === 3)
   }
 
   test("Partitioning via JDBCPartitioningInfo API") {
-    assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3)
-      .collect.size === 3)
+    assert(
+      TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", "THEID", 0, 4, 3, new Properties)
+      .collect().length === 3)
   }
 
   test("Partitioning via list-of-where-clauses API") {
     val parts = Array[String]("THEID < 2", "THEID >= 2")
-    assert(TestSQLContext.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts).collect().size === 3)
+    assert(TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, new Properties)
+      .collect().length === 3)
   }
 
   test("H2 integral types") {
     val rows = sql("SELECT * FROM inttypes WHERE A IS NOT NULL").collect()
-    assert(rows.size === 1)
+    assert(rows.length === 1)
     assert(rows(0).getInt(0) === 1)
     assert(rows(0).getBoolean(1) === false)
     assert(rows(0).getInt(2) === 3)
@@ -246,7 +249,7 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
 
   test("H2 null entries") {
     val rows = sql("SELECT * FROM inttypes WHERE A IS NULL").collect()
-    assert(rows.size === 1)
+    assert(rows.length === 1)
     assert(rows(0).isNullAt(0))
     assert(rows(0).isNullAt(1))
     assert(rows(0).isNullAt(2))
@@ -286,24 +289,28 @@ class JDBCSuite extends FunSuite with BeforeAndAfter {
   }
 
   test("test DATE types") {
-    val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
-    val cachedRows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().collect()
+    val rows = TestSQLContext.read.jdbc(
+      urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
+    val cachedRows = TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+      .cache().collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
     assert(rows(1).getAs[java.sql.Date](1) === null)
     assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
   }
 
   test("test DATE types in cache") {
-    val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.TIMETYPES").collect()
-    TestSQLContext
-      .jdbc(urlWithUserAndPass, "TEST.TIMETYPES").cache().registerTempTable("mycached_date")
+    val rows =
+      TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties).collect()
+    TestSQLContext.read.jdbc(urlWithUserAndPass, "TEST.TIMETYPES", new Properties)
+      .cache().registerTempTable("mycached_date")
     val cachedRows = sql("select * from mycached_date").collect()
     assert(rows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
     assert(cachedRows(0).getAs[java.sql.Date](1) === java.sql.Date.valueOf("1996-01-01"))
   }
 
   test("test types for null value") {
-    val rows = TestSQLContext.jdbc(urlWithUserAndPass, "TEST.NULLTYPES").collect()
+    val rows = TestSQLContext.read.jdbc(
+      urlWithUserAndPass, "TEST.NULLTYPES", new Properties).collect()
     assert((0 to 14).forall(i => rows(0).isNullAt(i)))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
index 0800ede..2e4c12f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCWriteSuite.scala
@@ -22,7 +22,7 @@ import java.util.Properties
 
 import org.scalatest.{BeforeAndAfter, FunSuite}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{SaveMode, Row}
 import org.apache.spark.sql.test._
 import org.apache.spark.sql.types._
 
@@ -90,64 +90,66 @@ class JDBCWriteSuite extends FunSuite with BeforeAndAfter {
   test("Basic CREATE") {
     val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
 
-    df.createJDBCTable(url, "TEST.BASICCREATETEST", false)
-    assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").count)
-    assert(2 == TestSQLContext.jdbc(url, "TEST.BASICCREATETEST").collect()(0).length)
+    df.write.jdbc(url, "TEST.BASICCREATETEST", new Properties)
+    assert(2 == TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).count)
+    assert(2 ==
+      TestSQLContext.read.jdbc(url, "TEST.BASICCREATETEST", new Properties).collect()(0).length)
   }
 
   test("CREATE with overwrite") {
     val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
     val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
 
-    df.createJDBCTable(url1, "TEST.DROPTEST", false, properties)
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
-    assert(3 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+    df.write.jdbc(url1, "TEST.DROPTEST", properties)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+    assert(3 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
 
-    df2.createJDBCTable(url1, "TEST.DROPTEST", true, properties)
-    assert(1 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).count)
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
+    df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.DROPTEST", properties)
+    assert(1 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).count)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.DROPTEST", properties).collect()(0).length)
   }
 
   test("CREATE then INSERT to append") {
     val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
     val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
 
-    df.createJDBCTable(url, "TEST.APPENDTEST", false)
-    df2.insertIntoJDBC(url, "TEST.APPENDTEST", false)
-    assert(3 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").count)
-    assert(2 == TestSQLContext.jdbc(url, "TEST.APPENDTEST").collect()(0).length)
+    df.write.jdbc(url, "TEST.APPENDTEST", new Properties)
+    df2.write.mode(SaveMode.Append).jdbc(url, "TEST.APPENDTEST", new Properties)
+    assert(3 == TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).count)
+    assert(2 ==
+      TestSQLContext.read.jdbc(url, "TEST.APPENDTEST", new Properties).collect()(0).length)
   }
 
   test("CREATE then INSERT to truncate") {
     val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
     val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr1x2), schema2)
 
-    df.createJDBCTable(url1, "TEST.TRUNCATETEST", false, properties)
-    df2.insertIntoJDBC(url1, "TEST.TRUNCATETEST", true, properties)
-    assert(1 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
+    df.write.jdbc(url1, "TEST.TRUNCATETEST", properties)
+    df2.write.mode(SaveMode.Overwrite).jdbc(url1, "TEST.TRUNCATETEST", properties)
+    assert(1 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).count)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.TRUNCATETEST", properties).collect()(0).length)
   }
 
   test("Incompatible INSERT to append") {
     val df = TestSQLContext.createDataFrame(sc.parallelize(arr2x2), schema2)
     val df2 = TestSQLContext.createDataFrame(sc.parallelize(arr2x3), schema3)
 
-    df.createJDBCTable(url, "TEST.INCOMPATIBLETEST", false)
+    df.write.jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
     intercept[org.apache.spark.SparkException] {
-      df2.insertIntoJDBC(url, "TEST.INCOMPATIBLETEST", true)
+      df2.write.mode(SaveMode.Append).jdbc(url, "TEST.INCOMPATIBLETEST", new Properties)
     }
   }
-  
+
   test("INSERT to JDBC Datasource") {
     TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
   }
-  
+
   test("INSERT to JDBC Datasource with overwrite") {
     TestSQLContext.sql("INSERT INTO TABLE PEOPLE1 SELECT * FROM PEOPLE")
     TestSQLContext.sql("INSERT OVERWRITE TABLE PEOPLE1 SELECT * FROM PEOPLE")
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).count)
-    assert(2 == TestSQLContext.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).count)
+    assert(2 == TestSQLContext.read.jdbc(url1, "TEST.PEOPLE1", properties).collect()(0).length)
   } 
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
index 53ddecf..58fe96a 100644
--- a/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
+++ b/sql/hive/src/test/java/org/apache/spark/sql/hive/JavaMetastoreDataSourcesSuite.java
@@ -81,7 +81,7 @@ public class JavaMetastoreDataSourcesSuite {
       jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
     }
     JavaRDD<String> rdd = sc.parallelize(jsonObjects);
-    df = sqlContext.jsonRDD(rdd);
+    df = sqlContext.read().json(rdd);
     df.registerTempTable("jsonTable");
   }
 
@@ -96,7 +96,11 @@ public class JavaMetastoreDataSourcesSuite {
   public void saveExternalTableAndQueryIt() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+    df.write()
+      .format("org.apache.spark.sql.json")
+      .mode(SaveMode.Append)
+      .options(options)
+      .saveAsTable("javaSavedTable");
 
     checkAnswer(
       sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -115,7 +119,11 @@ public class JavaMetastoreDataSourcesSuite {
   public void saveExternalTableWithSchemaAndQueryIt() {
     Map<String, String> options = new HashMap<String, String>();
     options.put("path", path.toString());
-    df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+    df.write()
+      .format("org.apache.spark.sql.json")
+      .mode(SaveMode.Append)
+      .options(options)
+      .saveAsTable("javaSavedTable");
 
     checkAnswer(
       sqlContext.sql("SELECT * FROM javaSavedTable"),
@@ -138,7 +146,11 @@ public class JavaMetastoreDataSourcesSuite {
   @Test
   public void saveTableAndQueryIt() {
     Map<String, String> options = new HashMap<String, String>();
-    df.saveAsTable("javaSavedTable", "org.apache.spark.sql.json", SaveMode.Append, options);
+    df.write()
+      .format("org.apache.spark.sql.json")
+      .mode(SaveMode.Append)
+      .options(options)
+      .saveAsTable("javaSavedTable");
 
     checkAnswer(
       sqlContext.sql("SELECT * FROM javaSavedTable"),

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
index fc6c3c3..945596d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/CachedTableSuite.scala
@@ -162,7 +162,7 @@ class CachedTableSuite extends QueryTest {
   test("REFRESH TABLE also needs to recache the data (data source tables)") {
     val tempPath: File = Utils.createTempDir()
     tempPath.delete()
-    table("src").save(tempPath.toString, "parquet", SaveMode.Overwrite)
+    table("src").write.mode(SaveMode.Overwrite).parquet(tempPath.toString)
     sql("DROP TABLE IF EXISTS refreshTable")
     createExternalTable("refreshTable", tempPath.toString, "parquet")
     checkAnswer(
@@ -172,7 +172,7 @@ class CachedTableSuite extends QueryTest {
     sql("CACHE TABLE refreshTable")
     assertCached(table("refreshTable"))
     // Append new data.
-    table("src").save(tempPath.toString, "parquet", SaveMode.Append)
+    table("src").write.mode(SaveMode.Append).parquet(tempPath.toString)
     // We are still using the old data.
     assertCached(table("refreshTable"))
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/517eb37a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 58b0b80..30db976 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -409,11 +409,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     val originalDefaultSource = conf.defaultDataSourceName
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = jsonRDD(rdd)
+    val df = read.json(rdd)
 
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
     // Save the df as a managed table (by not specifiying the path).
-    df.saveAsTable("savedJsonTable")
+    df.write.saveAsTable("savedJsonTable")
 
     checkAnswer(
       sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
@@ -443,11 +443,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     val originalDefaultSource = conf.defaultDataSourceName
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = jsonRDD(rdd)
+    val df = read.json(rdd)
 
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
     // Save the df as a managed table (by not specifiying the path).
-    df.saveAsTable("savedJsonTable")
+    df.write.saveAsTable("savedJsonTable")
 
     checkAnswer(
       sql("SELECT * FROM savedJsonTable"),
@@ -455,17 +455,17 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     // Right now, we cannot append to an existing JSON table.
     intercept[RuntimeException] {
-      df.saveAsTable("savedJsonTable", SaveMode.Append)
+      df.write.mode(SaveMode.Append).saveAsTable("savedJsonTable")
     }
 
     // We can overwrite it.
-    df.saveAsTable("savedJsonTable", SaveMode.Overwrite)
+    df.write.mode(SaveMode.Overwrite).saveAsTable("savedJsonTable")
     checkAnswer(
       sql("SELECT * FROM savedJsonTable"),
       df.collect())
 
     // When the save mode is Ignore, we will do nothing when the table already exists.
-    df.select("b").saveAsTable("savedJsonTable", SaveMode.Ignore)
+    df.select("b").write.mode(SaveMode.Ignore).saveAsTable("savedJsonTable")
     assert(df.schema === table("savedJsonTable").schema)
     checkAnswer(
       sql("SELECT * FROM savedJsonTable"),
@@ -479,11 +479,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     // Create an external table by specifying the path.
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.saveAsTable(
-      "savedJsonTable",
-      "org.apache.spark.sql.json",
-      SaveMode.Append,
-      Map("path" -> tempPath.toString))
+    df.write
+      .format("org.apache.spark.sql.json")
+      .mode(SaveMode.Append)
+      .option("path", tempPath.toString)
+      .saveAsTable("savedJsonTable")
     checkAnswer(
       sql("SELECT * FROM savedJsonTable"),
       df.collect())
@@ -501,14 +501,13 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     val originalDefaultSource = conf.defaultDataSourceName
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-    val df = jsonRDD(rdd)
+    val df = read.json(rdd)
 
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
-    df.saveAsTable(
-      "savedJsonTable",
-      "org.apache.spark.sql.json",
-      SaveMode.Append,
-      Map("path" -> tempPath.toString))
+    df.write.format("org.apache.spark.sql.json")
+      .mode(SaveMode.Append)
+      .option("path", tempPath.toString)
+      .saveAsTable("savedJsonTable")
 
     conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
     createExternalTable("createdJsonTable", tempPath.toString)
@@ -566,7 +565,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       setConf(SQLConf.PARQUET_USE_DATA_SOURCE_API, "true")
 
       val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
-      jsonRDD(rdd).registerTempTable("jt")
+      read.json(rdd).registerTempTable("jt")
       sql(
         """
           |create table test_parquet_ctas STORED AS parquET
@@ -601,7 +600,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       StructType(
         StructField("a", ArrayType(IntegerType, containsNull = true), nullable = true) :: Nil)
     assert(df1.schema === expectedSchema1)
-    df1.saveAsTable("arrayInParquet", "parquet", SaveMode.Overwrite)
+    df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("arrayInParquet")
 
     val df2 =
       createDataFrame(Tuple1(Seq(2, 3)) :: Nil).toDF("a")
@@ -610,10 +609,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
         StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
     assert(df2.schema === expectedSchema2)
     df2.insertInto("arrayInParquet", overwrite = false)
-    createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a")
-      .saveAsTable("arrayInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
-    createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a")
-      .saveAsTable("arrayInParquet", "parquet", SaveMode.Append)
+    createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+      .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
+    createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+      .mode(SaveMode.Append).saveAsTable("arrayInParquet")
     refreshTable("arrayInParquet")
 
     checkAnswer(
@@ -634,7 +633,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       StructType(
         StructField("a", mapType1, nullable = true) :: Nil)
     assert(df1.schema === expectedSchema1)
-    df1.saveAsTable("mapInParquet", "parquet", SaveMode.Overwrite)
+    df1.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("mapInParquet")
 
     val df2 =
       createDataFrame(Tuple1(Map(2 -> 3)) :: Nil).toDF("a")
@@ -644,10 +643,10 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
         StructField("a", mapType2, nullable = true) :: Nil)
     assert(df2.schema === expectedSchema2)
     df2.insertInto("mapInParquet", overwrite = false)
-    createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a")
-      .saveAsTable("mapInParquet", SaveMode.Append) // This one internally calls df2.insertInto.
-    createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a")
-      .saveAsTable("mapInParquet", "parquet", SaveMode.Append)
+    createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
+      .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
+    createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
+      .format("parquet").mode(SaveMode.Append).saveAsTable("mapInParquet")
     refreshTable("mapInParquet")
 
     checkAnswer(
@@ -711,30 +710,30 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     def createDF(from: Int, to: Int): DataFrame =
       createDataFrame((from to to).map(i => Tuple2(i, s"str$i"))).toDF("c1", "c2")
 
-    createDF(0, 9).saveAsTable("insertParquet", "parquet")
+    createDF(0, 9).write.format("parquet").saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
       (6 to 9).map(i => Row(i, s"str$i")))
 
     intercept[AnalysisException] {
-      createDF(10, 19).saveAsTable("insertParquet", "parquet")
+      createDF(10, 19).write.format("parquet").saveAsTable("insertParquet")
     }
 
-    createDF(10, 19).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+    createDF(10, 19).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, p.c2 FROM insertParquet p WHERE p.c1 > 5"),
       (6 to 19).map(i => Row(i, s"str$i")))
 
-    createDF(20, 29).saveAsTable("insertParquet", "parquet", SaveMode.Append)
+    createDF(20, 29).write.mode(SaveMode.Append).format("parquet").saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 25"),
       (6 to 24).map(i => Row(i, s"str$i")))
 
     intercept[AnalysisException] {
-      createDF(30, 39).saveAsTable("insertParquet")
+      createDF(30, 39).write.saveAsTable("insertParquet")
     }
 
-    createDF(30, 39).saveAsTable("insertParquet", SaveMode.Append)
+    createDF(30, 39).write.mode(SaveMode.Append).saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
       (6 to 34).map(i => Row(i, s"str$i")))
@@ -744,11 +743,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
       (6 to 44).map(i => Row(i, s"str$i")))
 
-    createDF(50, 59).saveAsTable("insertParquet", SaveMode.Overwrite)
+    createDF(50, 59).write.mode(SaveMode.Overwrite).saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 51 AND p.c1 < 55"),
       (52 to 54).map(i => Row(i, s"str$i")))
-    createDF(60, 69).saveAsTable("insertParquet", SaveMode.Ignore)
+    createDF(60, 69).write.mode(SaveMode.Ignore).saveAsTable("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p"),
       (50 to 59).map(i => Row(i, s"str$i")))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org