You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/02/11 02:29:58 UTC

[2/2] spark git commit: [SPARK-5658][SQL] Finalize DDL and write support APIs

[SPARK-5658][SQL] Finalize DDL and write support APIs

https://issues.apache.org/jira/browse/SPARK-5658

Author: Yin Huai <yh...@databricks.com>

This patch had conflicts when merged, resolved by
Committer: Michael Armbrust <mi...@databricks.com>

Closes #4446 from yhuai/writeSupportFollowup and squashes the following commits:

f3a96f7 [Yin Huai] davies's comments.
225ff71 [Yin Huai] Use Scala TestHiveContext to initialize the Python HiveContext in Python tests.
2306f93 [Yin Huai] Style.
2091fcd [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
537e28f [Yin Huai] Correctly clean up temp data.
ae4649e [Yin Huai] Fix Python test.
609129c [Yin Huai] Doc format.
92b6659 [Yin Huai] Python doc and other minor updates.
cbc717f [Yin Huai] Rename dataSourceName to source.
d1c12d3 [Yin Huai] No need to delete the duplicate rule since it has been removed in master.
22cfa70 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
d91ecb8 [Yin Huai] Fix test.
4c76d78 [Yin Huai] Simplify APIs.
3abc215 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
0832ce4 [Yin Huai] Fix test.
98e7cdb [Yin Huai] Python style.
2bf44ef [Yin Huai] Python APIs.
c204967 [Yin Huai] Format
a10223d [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
9ff97d8 [Yin Huai] Add SaveMode to saveAsTable.
9b6e570 [Yin Huai] Update doc.
c2be775 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
99950a2 [Yin Huai] Use Java enum for SaveMode.
4679665 [Yin Huai] Remove duplicate rule.
77d89dc [Yin Huai] Update doc.
e04d908 [Yin Huai] Move import and add (Scala-specific) to scala APIs.
cf5703d [Yin Huai] Add checkAnswer to Java tests.
7db95ff [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
6dfd386 [Yin Huai] Add java test.
f2f33ef [Yin Huai] Fix test.
e702386 [Yin Huai] Apache header.
b1e9b1b [Yin Huai] Format.
ed4e1b4 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupportFollowup
af9e9b3 [Yin Huai] DDL and write support API followup.
2a6213a [Yin Huai] Update API names.
e6a0b77 [Yin Huai] Update test.
43bae01 [Yin Huai] Remove createTable from HiveContext.
5ffc372 [Yin Huai] Add more load APIs to SQLContext.
5390743 [Yin Huai] Add more save APIs to DataFrame.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aaf50d05
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aaf50d05
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aaf50d05

Branch: refs/heads/master
Commit: aaf50d05c7616e4f8f16654b642500ae06cdd774
Parents: ed167e7
Author: Yin Huai <yh...@databricks.com>
Authored: Tue Feb 10 17:29:52 2015 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Feb 10 17:29:52 2015 -0800

----------------------------------------------------------------------
 python/pyspark/sql/context.py                   |  68 +++
 python/pyspark/sql/dataframe.py                 |  72 ++-
 python/pyspark/sql/tests.py                     | 107 ++++-
 .../org/apache/spark/sql/sources/SaveMode.java  |  45 ++
 .../scala/org/apache/spark/sql/DataFrame.scala  | 160 ++++++-
 .../org/apache/spark/sql/DataFrameImpl.scala    |  61 +--
 .../apache/spark/sql/IncomputableColumn.scala   |  27 +-
 .../scala/org/apache/spark/sql/SQLConf.scala    |   2 +-
 .../scala/org/apache/spark/sql/SQLContext.scala | 164 ++++++-
 .../spark/sql/execution/SparkStrategies.scala   |  14 +-
 .../apache/spark/sql/json/JSONRelation.scala    |  30 +-
 .../apache/spark/sql/parquet/newParquet.scala   |  45 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  40 +-
 .../apache/spark/sql/sources/interfaces.scala   |  19 +
 .../spark/sql/sources/JavaSaveLoadSuite.java    |  97 ++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  92 ++--
 .../sql/sources/CreateTableAsSelectSuite.scala  |  29 +-
 .../spark/sql/sources/SaveLoadSuite.scala       |  59 ++-
 .../org/apache/spark/sql/hive/HiveContext.scala |  76 ----
 .../apache/spark/sql/hive/HiveStrategies.scala  |  13 +-
 .../org/apache/spark/sql/hive/TestHive.scala    | 455 -------------------
 .../spark/sql/hive/execution/commands.scala     | 105 ++++-
 .../apache/spark/sql/hive/test/TestHive.scala   | 453 ++++++++++++++++++
 .../sql/hive/JavaMetastoreDataSourcesSuite.java | 147 ++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  64 ++-
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  33 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 118 ++++-
 27 files changed, 1801 insertions(+), 794 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/python/pyspark/sql/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 49f016a..882c0f9 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -21,6 +21,7 @@ from array import array
 from itertools import imap
 
 from py4j.protocol import Py4JError
+from py4j.java_collections import MapConverter
 
 from pyspark.rdd import _prepare_for_python_RDD
 from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
@@ -87,6 +88,18 @@ class SQLContext(object):
             self._scala_SQLContext = self._jvm.SQLContext(self._jsc.sc())
         return self._scala_SQLContext
 
+    def setConf(self, key, value):
+        """Sets the given Spark SQL configuration property.
+        """
+        self._ssql_ctx.setConf(key, value)
+
+    def getConf(self, key, defaultValue):
+        """Returns the value of Spark SQL configuration property for the given key.
+
+        If the key is not set, returns defaultValue.
+        """
+        return self._ssql_ctx.getConf(key, defaultValue)
+
     def registerFunction(self, name, f, returnType=StringType()):
         """Registers a lambda function as a UDF so it can be used in SQL statements.
 
@@ -455,6 +468,61 @@ class SQLContext(object):
             df = self._ssql_ctx.jsonRDD(jrdd.rdd(), scala_datatype)
         return DataFrame(df, self)
 
+    def load(self, path=None, source=None, schema=None, **options):
+        """Returns the dataset in a data source as a DataFrame.
+
+        The data source is specified by the `source` and a set of `options`.
+        If `source` is not specified, the default data source configured by
+        spark.sql.sources.default will be used.
+
+        Optionally, a schema can be provided as the schema of the returned DataFrame.
+        """
+        if path is not None:
+            options["path"] = path
+        if source is None:
+            source = self.getConf("spark.sql.sources.default",
+                                  "org.apache.spark.sql.parquet")
+        joptions = MapConverter().convert(options,
+                                          self._sc._gateway._gateway_client)
+        if schema is None:
+            df = self._ssql_ctx.load(source, joptions)
+        else:
+            if not isinstance(schema, StructType):
+                raise TypeError("schema should be StructType")
+            scala_datatype = self._ssql_ctx.parseDataType(schema.json())
+            df = self._ssql_ctx.load(source, scala_datatype, joptions)
+        return DataFrame(df, self)
+
+    def createExternalTable(self, tableName, path=None, source=None,
+                            schema=None, **options):
+        """Creates an external table based on the dataset in a data source.
+
+        It returns the DataFrame associated with the external table.
+
+        The data source is specified by the `source` and a set of `options`.
+        If `source` is not specified, the default data source configured by
+        spark.sql.sources.default will be used.
+
+        Optionally, a schema can be provided as the schema of the returned DataFrame and
+        created external table.
+        """
+        if path is not None:
+            options["path"] = path
+        if source is None:
+            source = self.getConf("spark.sql.sources.default",
+                                  "org.apache.spark.sql.parquet")
+        joptions = MapConverter().convert(options,
+                                          self._sc._gateway._gateway_client)
+        if schema is None:
+            df = self._ssql_ctx.createExternalTable(tableName, source, joptions)
+        else:
+            if not isinstance(schema, StructType):
+                raise TypeError("schema should be StructType")
+            scala_datatype = self._ssql_ctx.parseDataType(schema.json())
+            df = self._ssql_ctx.createExternalTable(tableName, source, scala_datatype,
+                                                    joptions)
+        return DataFrame(df, self)
+
     def sql(self, sqlQuery):
         """Return a L{DataFrame} representing the result of the given query.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/python/pyspark/sql/dataframe.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index 04be65f..3eef0cc 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -146,9 +146,75 @@ class DataFrame(object):
         """
         self._jdf.insertInto(tableName, overwrite)
 
-    def saveAsTable(self, tableName):
-        """Creates a new table with the contents of this DataFrame."""
-        self._jdf.saveAsTable(tableName)
+    def _java_save_mode(self, mode):
+        """Returns the Java save mode based on the Python save mode represented by a string.
+        """
+        jSaveMode = self._sc._jvm.org.apache.spark.sql.sources.SaveMode
+        jmode = jSaveMode.ErrorIfExists
+        mode = mode.lower()
+        if mode == "append":
+            jmode = jSaveMode.Append
+        elif mode == "overwrite":
+            jmode = jSaveMode.Overwrite
+        elif mode == "ignore":
+            jmode = jSaveMode.Ignore
+        elif mode == "error":
+            pass
+        else:
+            raise ValueError(
+                "Only 'append', 'overwrite', 'ignore', and 'error' are acceptable save mode.")
+        return jmode
+
+    def saveAsTable(self, tableName, source=None, mode="append", **options):
+        """Saves the contents of the DataFrame to a data source as a table.
+
+        The data source is specified by the `source` and a set of `options`.
+        If `source` is not specified, the default data source configured by
+        spark.sql.sources.default will be used.
+
+        Additionally, mode is used to specify the behavior of the saveAsTable operation when
+        table already exists in the data source. There are four modes:
+
+        * append: Contents of this DataFrame are expected to be appended to existing table.
+        * overwrite: Data in the existing table is expected to be overwritten by the contents of \
+            this DataFrame.
+        * error: An exception is expected to be thrown.
+        * ignore: The save operation is expected to not save the contents of the DataFrame and \
+            to not change the existing table.
+        """
+        if source is None:
+            source = self.sql_ctx.getConf("spark.sql.sources.default",
+                                          "org.apache.spark.sql.parquet")
+        jmode = self._java_save_mode(mode)
+        joptions = MapConverter().convert(options,
+                                          self.sql_ctx._sc._gateway._gateway_client)
+        self._jdf.saveAsTable(tableName, source, jmode, joptions)
+
+    def save(self, path=None, source=None, mode="append", **options):
+        """Saves the contents of the DataFrame to a data source.
+
+        The data source is specified by the `source` and a set of `options`.
+        If `source` is not specified, the default data source configured by
+        spark.sql.sources.default will be used.
+
+        Additionally, mode is used to specify the behavior of the save operation when
+        data already exists in the data source. There are four modes:
+
+        * append: Contents of this DataFrame are expected to be appended to existing data.
+        * overwrite: Existing data is expected to be overwritten by the contents of this DataFrame.
+        * error: An exception is expected to be thrown.
+        * ignore: The save operation is expected to not save the contents of the DataFrame and \
+            to not change the existing data.
+        """
+        if path is not None:
+            options["path"] = path
+        if source is None:
+            source = self.sql_ctx.getConf("spark.sql.sources.default",
+                                          "org.apache.spark.sql.parquet")
+        jmode = self._java_save_mode(mode)
+        joptions = MapConverter().convert(options,
+                                          self._sc._gateway._gateway_client)
+        self._jdf.save(source, jmode, joptions)
 
     def schema(self):
         """Returns the schema of this DataFrame (represented by

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index d25c636..bc94509 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -34,10 +34,9 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
-
-from pyspark.sql import SQLContext, Column
+from pyspark.sql import SQLContext, HiveContext, Column
 from pyspark.sql.types import IntegerType, Row, ArrayType, StructType, StructField, \
-    UserDefinedType, DoubleType, LongType
+    UserDefinedType, DoubleType, LongType, StringType
 from pyspark.tests import ReusedPySparkTestCase
 
 
@@ -286,6 +285,37 @@ class SQLTests(ReusedPySparkTestCase):
         self.assertTrue(95 < g.agg(Dsl.approxCountDistinct(df.key)).first()[0])
         self.assertEqual(100, g.agg(Dsl.countDistinct(df.value)).first()[0])
 
+    def test_save_and_load(self):
+        df = self.df
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        df.save(tmpPath, "org.apache.spark.sql.json", "error")
+        actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json")
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+        schema = StructType([StructField("value", StringType(), True)])
+        actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json", schema)
+        self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect()))
+
+        df.save(tmpPath, "org.apache.spark.sql.json", "overwrite")
+        actual = self.sqlCtx.load(tmpPath, "org.apache.spark.sql.json")
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+        df.save(source="org.apache.spark.sql.json", mode="overwrite", path=tmpPath,
+                noUse="this options will not be used in save.")
+        actual = self.sqlCtx.load(source="org.apache.spark.sql.json", path=tmpPath,
+                                  noUse="this options will not be used in load.")
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+
+        defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
+                                                    "org.apache.spark.sql.parquet")
+        self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
+        actual = self.sqlCtx.load(path=tmpPath)
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+        self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
+
+        shutil.rmtree(tmpPath)
+
     def test_help_command(self):
         # Regression test for SPARK-5464
         rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
@@ -296,5 +326,76 @@ class SQLTests(ReusedPySparkTestCase):
         pydoc.render_doc(df.take(1))
 
 
+class HiveContextSQLTests(ReusedPySparkTestCase):
+
+    @classmethod
+    def setUpClass(cls):
+        ReusedPySparkTestCase.setUpClass()
+        cls.tempdir = tempfile.NamedTemporaryFile(delete=False)
+        os.unlink(cls.tempdir.name)
+        print "type", type(cls.sc)
+        print "type", type(cls.sc._jsc)
+        _scala_HiveContext =\
+            cls.sc._jvm.org.apache.spark.sql.hive.test.TestHiveContext(cls.sc._jsc.sc())
+        cls.sqlCtx = HiveContext(cls.sc, _scala_HiveContext)
+        cls.testData = [Row(key=i, value=str(i)) for i in range(100)]
+        rdd = cls.sc.parallelize(cls.testData)
+        cls.df = cls.sqlCtx.inferSchema(rdd)
+
+    @classmethod
+    def tearDownClass(cls):
+        ReusedPySparkTestCase.tearDownClass()
+        shutil.rmtree(cls.tempdir.name, ignore_errors=True)
+
+    def test_save_and_load_table(self):
+        df = self.df
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "append", path=tmpPath)
+        actual = self.sqlCtx.createExternalTable("externalJsonTable", tmpPath,
+                                                 "org.apache.spark.sql.json")
+        self.assertTrue(
+            sorted(df.collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+        self.assertTrue(
+            sorted(df.collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+        self.sqlCtx.sql("DROP TABLE externalJsonTable")
+
+        df.saveAsTable("savedJsonTable", "org.apache.spark.sql.json", "overwrite", path=tmpPath)
+        schema = StructType([StructField("value", StringType(), True)])
+        actual = self.sqlCtx.createExternalTable("externalJsonTable",
+                                                 source="org.apache.spark.sql.json",
+                                                 schema=schema, path=tmpPath,
+                                                 noUse="this options will not be used")
+        self.assertTrue(
+            sorted(df.collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+        self.assertTrue(
+            sorted(df.select("value").collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+        self.assertTrue(sorted(df.select("value").collect()) == sorted(actual.collect()))
+        self.sqlCtx.sql("DROP TABLE savedJsonTable")
+        self.sqlCtx.sql("DROP TABLE externalJsonTable")
+
+        defaultDataSourceName = self.sqlCtx.getConf("spark.sql.sources.default",
+                                                    "org.apache.spark.sql.parquet")
+        self.sqlCtx.sql("SET spark.sql.sources.default=org.apache.spark.sql.json")
+        df.saveAsTable("savedJsonTable", path=tmpPath, mode="overwrite")
+        actual = self.sqlCtx.createExternalTable("externalJsonTable", path=tmpPath)
+        self.assertTrue(
+            sorted(df.collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM savedJsonTable").collect()))
+        self.assertTrue(
+            sorted(df.collect()) ==
+            sorted(self.sqlCtx.sql("SELECT * FROM externalJsonTable").collect()))
+        self.assertTrue(sorted(df.collect()) == sorted(actual.collect()))
+        self.sqlCtx.sql("DROP TABLE savedJsonTable")
+        self.sqlCtx.sql("DROP TABLE externalJsonTable")
+        self.sqlCtx.sql("SET spark.sql.sources.default=" + defaultDataSourceName)
+
+        shutil.rmtree(tmpPath)
+
 if __name__ == "__main__":
     unittest.main()

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java b/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java
new file mode 100644
index 0000000..3109f57
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/sources/SaveMode.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources;
+
+/**
+ * SaveMode is used to specify the expected behavior of saving a DataFrame to a data source.
+ */
+public enum SaveMode {
+  /**
+   * Append mode means that when saving a DataFrame to a data source, if data/table already exists,
+   * contents of the DataFrame are expected to be appended to existing data.
+   */
+  Append,
+  /**
+   * Overwrite mode means that when saving a DataFrame to a data source,
+   * if data/table already exists, existing data is expected to be overwritten by the contents of
+   * the DataFrame.
+   */
+  Overwrite,
+  /**
+   * ErrorIfExists mode means that when saving a DataFrame to a data source, if data already exists,
+   * an exception is expected to be thrown.
+   */
+  ErrorIfExists,
+  /**
+   * Ignore mode means that when saving a DataFrame to a data source, if data already exists,
+   * the save operation is expected to not save the contents of the DataFrame and to not
+   * change the existing data.
+   */
+  Ignore
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 04e0d09..ca8d552 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,19 +17,19 @@
 
 package org.apache.spark.sql
 
+import scala.collection.JavaConversions._
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.sources.SaveMode
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
 
-import scala.util.control.NonFatal
-
-
 private[sql] object DataFrame {
   def apply(sqlContext: SQLContext, logicalPlan: LogicalPlan): DataFrame = {
     new DataFrameImpl(sqlContext, logicalPlan)
@@ -574,8 +574,64 @@ trait DataFrame extends RDDApi[Row] {
 
   /**
    * :: Experimental ::
-   * Creates a table from the the contents of this DataFrame.  This will fail if the table already
-   * exists.
+   * Creates a table from the the contents of this DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   * This will fail if the table already exists.
+   *
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  def saveAsTable(tableName: String): Unit = {
+    saveAsTable(tableName, SaveMode.ErrorIfExists)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the the contents of this DataFrame, using the default data source
+   * configured by spark.sql.sources.default and [[SaveMode.ErrorIfExists]] as the save mode.
+   *
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  def saveAsTable(tableName: String, mode: SaveMode): Unit = {
+    if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
+      // If table already exists and the save mode is Append,
+      // we will just call insertInto to append the contents of this DataFrame.
+      insertInto(tableName, overwrite = false)
+    } else {
+      val dataSourceName = sqlContext.conf.defaultDataSourceName
+      saveAsTable(tableName, dataSourceName, mode)
+    }
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table at the given path from the the contents of this DataFrame
+   * based on a given data source and a set of options,
+   * using [[SaveMode.ErrorIfExists]] as the save mode.
+   *
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  def saveAsTable(
+      tableName: String,
+      source: String): Unit = {
+    saveAsTable(tableName, source, SaveMode.ErrorIfExists)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates a table at the given path from the the contents of this DataFrame
+   * based on a given data source, [[SaveMode]] specified by mode, and a set of options.
    *
    * Note that this currently only works with DataFrames that are created from a HiveContext as
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
@@ -583,12 +639,17 @@ trait DataFrame extends RDDApi[Row] {
    * be the target of an `insertInto`.
    */
   @Experimental
-  def saveAsTable(tableName: String): Unit
+  def saveAsTable(
+      tableName: String,
+      source: String,
+      mode: SaveMode): Unit = {
+    saveAsTable(tableName, source, mode, Map.empty[String, String])
+  }
 
   /**
    * :: Experimental ::
-   * Creates a table from the the contents of this DataFrame based on a given data source and
-   * a set of options. This will fail if the table already exists.
+   * Creates a table at the given path from the the contents of this DataFrame
+   * based on a given data source, [[SaveMode]] specified by mode, and a set of options.
    *
    * Note that this currently only works with DataFrames that are created from a HiveContext as
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
@@ -598,14 +659,17 @@ trait DataFrame extends RDDApi[Row] {
   @Experimental
   def saveAsTable(
       tableName: String,
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit
+      source: String,
+      mode: SaveMode,
+      options: java.util.Map[String, String]): Unit = {
+    saveAsTable(tableName, source, mode, options.toMap)
+  }
 
   /**
    * :: Experimental ::
-   * Creates a table from the the contents of this DataFrame based on a given data source and
-   * a set of options. This will fail if the table already exists.
+   * (Scala-specific)
+   * Creates a table from the the contents of this DataFrame based on a given data source,
+   * [[SaveMode]] specified by mode, and a set of options.
    *
    * Note that this currently only works with DataFrames that are created from a HiveContext as
    * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
@@ -615,22 +679,76 @@ trait DataFrame extends RDDApi[Row] {
   @Experimental
   def saveAsTable(
       tableName: String,
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit
+
+  /**
+   * :: Experimental ::
+   * Saves the contents of this DataFrame to the given path,
+   * using the default data source configured by spark.sql.sources.default and
+   * [[SaveMode.ErrorIfExists]] as the save mode.
+   */
+  @Experimental
+  def save(path: String): Unit = {
+    save(path, SaveMode.ErrorIfExists)
+  }
+
+  /**
+   * :: Experimental ::
+   * Saves the contents of this DataFrame to the given path and [[SaveMode]] specified by mode,
+   * using the default data source configured by spark.sql.sources.default.
+   */
+  @Experimental
+  def save(path: String, mode: SaveMode): Unit = {
+    val dataSourceName = sqlContext.conf.defaultDataSourceName
+    save(path, dataSourceName, mode)
+  }
 
+  /**
+   * :: Experimental ::
+   * Saves the contents of this DataFrame to the given path based on the given data source,
+   * using [[SaveMode.ErrorIfExists]] as the save mode.
+   */
+  @Experimental
+  def save(path: String, source: String): Unit = {
+    save(source, SaveMode.ErrorIfExists, Map("path" -> path))
+  }
+
+  /**
+   * :: Experimental ::
+   * Saves the contents of this DataFrame to the given path based on the given data source and
+   * [[SaveMode]] specified by mode.
+   */
   @Experimental
-  def save(path: String): Unit
+  def save(path: String, source: String, mode: SaveMode): Unit = {
+    save(source, mode, Map("path" -> path))
+  }
 
+  /**
+   * :: Experimental ::
+   * Saves the contents of this DataFrame based on the given data source,
+   * [[SaveMode]] specified by mode, and a set of options.
+   */
   @Experimental
   def save(
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit
+      source: String,
+      mode: SaveMode,
+      options: java.util.Map[String, String]): Unit = {
+    save(source, mode, options.toMap)
+  }
 
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Saves the contents of this DataFrame based on the given data source,
+   * [[SaveMode]] specified by mode, and a set of options
+   */
   @Experimental
   def save(
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit
 
   /**
    * :: Experimental ::

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index 1ee16ad..11f9334 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -28,13 +28,14 @@ import org.apache.spark.api.python.SerDeUtil
 import org.apache.spark.rdd.RDD
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.sql.catalyst.{SqlParser, ScalaReflection}
-import org.apache.spark.sql.catalyst.analysis.{ResolvedStar, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.analysis.{EliminateAnalysisOperators, ResolvedStar, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util.sideBySide
 import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
 import org.apache.spark.sql.json.JsonRDD
-import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.{NumericType, StructType}
 
 
@@ -341,68 +342,34 @@ private[sql] class DataFrameImpl protected[sql](
 
   override def saveAsParquetFile(path: String): Unit = {
     if (sqlContext.conf.parquetUseDataSourceApi) {
-      save("org.apache.spark.sql.parquet", "path" -> path)
+      save("org.apache.spark.sql.parquet", SaveMode.ErrorIfExists, Map("path" -> path))
     } else {
       sqlContext.executePlan(WriteToFile(path, logicalPlan)).toRdd
     }
   }
 
-  override def saveAsTable(tableName: String): Unit = {
-    val dataSourceName = sqlContext.conf.defaultDataSourceName
-    val cmd =
-      CreateTableUsingAsLogicalPlan(
-        tableName,
-        dataSourceName,
-        temporary = false,
-        Map.empty,
-        allowExisting = false,
-        logicalPlan)
-
-    sqlContext.executePlan(cmd).toRdd
-  }
-
   override def saveAsTable(
       tableName: String,
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit = {
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit = {
     val cmd =
       CreateTableUsingAsLogicalPlan(
         tableName,
-        dataSourceName,
+        source,
         temporary = false,
-        (option +: options).toMap,
-        allowExisting = false,
+        mode,
+        options,
         logicalPlan)
 
     sqlContext.executePlan(cmd).toRdd
   }
 
-  override def saveAsTable(
-      tableName: String,
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit = {
-    val opts = options.toSeq
-    saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
-  }
-
-  override def save(path: String): Unit = {
-    val dataSourceName = sqlContext.conf.defaultDataSourceName
-    save(dataSourceName, "path" -> path)
-  }
-
-  override def save(
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit = {
-    ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
-  }
-
   override def save(
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit = {
-    val opts = options.toSeq
-    save(dataSourceName, opts.head, opts.tail:_*)
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit = {
+    ResolvedDataSource(sqlContext, source, mode, options, this)
   }
 
   override def insertInto(tableName: String, overwrite: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index ce0557b..494e49c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -25,9 +25,9 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedSt
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.storage.StorageLevel
+import org.apache.spark.sql.sources.SaveMode
 import org.apache.spark.sql.types.StructType
 
-
 private[sql] class IncomputableColumn(protected[sql] val expr: Expression) extends Column {
 
   def this(name: String) = this(name match {
@@ -156,29 +156,16 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def saveAsParquetFile(path: String): Unit = err()
 
-  override def saveAsTable(tableName: String): Unit = err()
-
-  override def saveAsTable(
-      tableName: String,
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit = err()
-
   override def saveAsTable(
       tableName: String,
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit = err()
-
-  override def save(path: String): Unit = err()
-
-  override def save(
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): Unit = err()
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit = err()
 
   override def save(
-      dataSourceName: String,
-      options: java.util.Map[String, String]): Unit = err()
+      source: String,
+      mode: SaveMode,
+      options: Map[String, String]): Unit = err()
 
   override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 180f5e7..39f6c2f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -50,7 +50,7 @@ private[spark] object SQLConf {
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
 
   // This is used to set the default data source
-  val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
+  val DEFAULT_DATA_SOURCE_NAME = "spark.sql.sources.default"
 
   // Whether to perform eager analysis on a DataFrame.
   val DATAFRAME_EAGER_ANALYSIS = "spark.sql.dataframe.eagerAnalysis"

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 97e3777..801505b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -401,27 +401,173 @@ class SQLContext(@transient val sparkContext: SparkContext)
     jsonRDD(json.rdd, samplingRatio);
   }
 
+  /**
+   * :: Experimental ::
+   * Returns the dataset stored at path as a DataFrame,
+   * using the default data source configured by spark.sql.sources.default.
+   */
   @Experimental
   def load(path: String): DataFrame = {
     val dataSourceName = conf.defaultDataSourceName
-    load(dataSourceName, ("path", path))
+    load(path, dataSourceName)
   }
 
+  /**
+   * :: Experimental ::
+   * Returns the dataset stored at path as a DataFrame,
+   * using the given data source.
+   */
   @Experimental
-  def load(
-      dataSourceName: String,
-      option: (String, String),
-      options: (String, String)*): DataFrame = {
-    val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap)
+  def load(path: String, source: String): DataFrame = {
+    load(source, Map("path" -> path))
+  }
+
+  /**
+   * :: Experimental ::
+   * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+   */
+  @Experimental
+  def load(source: String, options: java.util.Map[String, String]): DataFrame = {
+    load(source, options.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Returns the dataset specified by the given data source and a set of options as a DataFrame.
+   */
+  @Experimental
+  def load(source: String, options: Map[String, String]): DataFrame = {
+    val resolved = ResolvedDataSource(this, None, source, options)
     DataFrame(this, LogicalRelation(resolved.relation))
   }
 
+  /**
+   * :: Experimental ::
+   * Returns the dataset specified by the given data source and a set of options as a DataFrame,
+   * using the given schema as the schema of the DataFrame.
+   */
   @Experimental
   def load(
-      dataSourceName: String,
+      source: String,
+      schema: StructType,
       options: java.util.Map[String, String]): DataFrame = {
-    val opts = options.toSeq
-    load(dataSourceName, opts.head, opts.tail:_*)
+    load(source, schema, options.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Returns the dataset specified by the given data source and a set of options as a DataFrame,
+   * using the given schema as the schema of the DataFrame.
+   */
+  @Experimental
+  def load(
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    val resolved = ResolvedDataSource(this, Some(schema), source, options)
+    DataFrame(this, LogicalRelation(resolved.relation))
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path and returns the corresponding DataFrame.
+   * It will use the default data source configured by spark.sql.sources.default.
+   */
+  @Experimental
+  def createExternalTable(tableName: String, path: String): DataFrame = {
+    val dataSourceName = conf.defaultDataSourceName
+    createExternalTable(tableName, path, dataSourceName)
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source
+   * and returns the corresponding DataFrame.
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      path: String,
+      source: String): DataFrame = {
+    createExternalTable(tableName, source, Map("path" -> path))
+  }
+
+  /**
+   * :: Experimental ::
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    createExternalTable(tableName, source, options.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Creates an external table from the given path based on a data source and a set of options.
+   * Then, returns the corresponding DataFrame.
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      options: Map[String, String]): DataFrame = {
+    val cmd =
+      CreateTableUsing(
+        tableName,
+        userSpecifiedSchema = None,
+        source,
+        temporary = false,
+        options,
+        allowExisting = false,
+        managedIfNoPath = false)
+    executePlan(cmd).toRdd
+    table(tableName)
+  }
+
+  /**
+   * :: Experimental ::
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: java.util.Map[String, String]): DataFrame = {
+    createExternalTable(tableName, source, schema, options.toMap)
+  }
+
+  /**
+   * :: Experimental ::
+   * (Scala-specific)
+   * Create an external table from the given path based on a data source, a schema and
+   * a set of options. Then, returns the corresponding DataFrame.
+   */
+  @Experimental
+  def createExternalTable(
+      tableName: String,
+      source: String,
+      schema: StructType,
+      options: Map[String, String]): DataFrame = {
+    val cmd =
+      CreateTableUsing(
+        tableName,
+        userSpecifiedSchema = Some(schema),
+        source,
+        temporary = false,
+        options,
+        allowExisting = false,
+        managedIfNoPath = false)
+    executePlan(cmd).toRdd
+    table(tableName)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index edf8a5b..e915e0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -309,7 +309,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) =>
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false, _) =>
         ExecutedCommand(
           CreateTempTableUsing(
             tableName, userSpecifiedSchema, provider, opts)) :: Nil
@@ -318,24 +318,20 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
       case c: CreateTableUsing if c.temporary && c.allowExisting =>
         sys.error("allowExisting should be set to false when creating a temporary table.")
 
-      case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) =>
+      case CreateTableUsingAsSelect(tableName, provider, true, mode, opts, query) =>
         val logicalPlan = sqlContext.parseSql(query)
         val cmd =
-          CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan)
+          CreateTempTableUsingAsSelect(tableName, provider, mode, opts, logicalPlan)
         ExecutedCommand(cmd) :: Nil
       case c: CreateTableUsingAsSelect if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
-      case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting =>
-        sys.error("allowExisting should be set to false when creating a temporary table.")
 
-      case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) =>
+      case CreateTableUsingAsLogicalPlan(tableName, provider, true, mode, opts, query) =>
         val cmd =
-          CreateTempTableUsingAsSelect(tableName, provider, opts, query)
+          CreateTempTableUsingAsSelect(tableName, provider, mode, opts, query)
         ExecutedCommand(cmd) :: Nil
       case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
-      case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
-        sys.error("allowExisting should be set to false when creating a temporary table.")
 
       case LogicalDescribeCommand(table, isExtended) =>
         val resultPlan = self.sqlContext.executePlan(table).executedPlan

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index c4e14c6..f828bcd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -20,8 +20,7 @@ package org.apache.spark.sql.json
 import java.io.IOException
 
 import org.apache.hadoop.fs.Path
-import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 
@@ -29,6 +28,10 @@ import org.apache.spark.sql.types.StructType
 private[sql] class DefaultSource
   extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
 
+  private def checkPath(parameters: Map[String, String]): String = {
+    parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
+  }
+
   /** Returns a new base relation with the parameters. */
   override def createRelation(
       sqlContext: SQLContext,
@@ -52,15 +55,30 @@ private[sql] class DefaultSource
 
   override def createRelation(
       sqlContext: SQLContext,
+      mode: SaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
-    val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+    val path = checkPath(parameters)
     val filesystemPath = new Path(path)
     val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
-    if (fs.exists(filesystemPath)) {
-      sys.error(s"path $path already exists.")
+    val doSave = if (fs.exists(filesystemPath)) {
+      mode match {
+        case SaveMode.Append =>
+          sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+        case SaveMode.Overwrite =>
+          fs.delete(filesystemPath, true)
+          true
+        case SaveMode.ErrorIfExists =>
+          sys.error(s"path $path already exists.")
+        case SaveMode.Ignore => false
+      }
+    } else {
+      true
+    }
+    if (doSave) {
+      // Only save data when the save mode is not ignore.
+      data.toJSON.saveAsTextFile(path)
     }
-    data.toJSON.saveAsTextFile(path)
 
     createRelation(sqlContext, parameters, data.schema)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 04804f7..aef9c10 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -80,18 +80,45 @@ class DefaultSource
 
   override def createRelation(
       sqlContext: SQLContext,
+      mode: SaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation = {
     val path = checkPath(parameters)
-    ParquetRelation.createEmpty(
-      path,
-      data.schema.toAttributes,
-      false,
-      sqlContext.sparkContext.hadoopConfiguration,
-      sqlContext)
-
-    val relation = createRelation(sqlContext, parameters, data.schema)
-    relation.asInstanceOf[ParquetRelation2].insert(data, true)
+    val filesystemPath = new Path(path)
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+    val doSave = if (fs.exists(filesystemPath)) {
+      mode match {
+        case SaveMode.Append =>
+          sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
+        case SaveMode.Overwrite =>
+          fs.delete(filesystemPath, true)
+          true
+        case SaveMode.ErrorIfExists =>
+          sys.error(s"path $path already exists.")
+        case SaveMode.Ignore => false
+      }
+    } else {
+      true
+    }
+
+    val relation = if (doSave) {
+      // Only save data when the save mode is not ignore.
+      ParquetRelation.createEmpty(
+        path,
+        data.schema.toAttributes,
+        false,
+        sqlContext.sparkContext.hadoopConfiguration,
+        sqlContext)
+
+      val createdRelation = createRelation(sqlContext, parameters, data.schema)
+      createdRelation.asInstanceOf[ParquetRelation2].insert(data, true)
+
+      createdRelation
+    } else {
+      // If the save mode is Ignore, we will just create the relation based on existing data.
+      createRelation(sqlContext, parameters)
+    }
+
     relation
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 9f64f76..6487c14 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -119,11 +119,20 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
             throw new DDLException(
               "a CREATE TABLE AS SELECT statement does not allow column definitions.")
           }
+          // When IF NOT EXISTS clause appears in the query, the save mode will be ignore.
+          val mode = if (allowExisting.isDefined) {
+            SaveMode.Ignore
+          } else if (temp.isDefined) {
+            SaveMode.Overwrite
+          } else {
+            SaveMode.ErrorIfExists
+          }
+
           CreateTableUsingAsSelect(tableName,
             provider,
             temp.isDefined,
+            mode,
             options,
-            allowExisting.isDefined,
             query.get)
         } else {
           val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
@@ -133,7 +142,8 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
             provider,
             temp.isDefined,
             options,
-            allowExisting.isDefined)
+            allowExisting.isDefined,
+            managedIfNoPath = false)
         }
       }
   )
@@ -264,6 +274,7 @@ object ResolvedDataSource {
   def apply(
       sqlContext: SQLContext,
       provider: String,
+      mode: SaveMode,
       options: Map[String, String],
       data: DataFrame): ResolvedDataSource = {
     val loader = Utils.getContextOrSparkClassLoader
@@ -277,7 +288,7 @@ object ResolvedDataSource {
 
     val relation = clazz.newInstance match {
       case dataSource: CreatableRelationProvider =>
-        dataSource.createRelation(sqlContext, options, data)
+        dataSource.createRelation(sqlContext, mode, options, data)
       case _ =>
         sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
     }
@@ -307,28 +318,40 @@ private[sql] case class DescribeCommand(
       new MetadataBuilder().putString("comment", "comment of the column").build())())
 }
 
+/**
+  * Used to represent the operation of create table using a data source.
+  * @param tableName
+  * @param userSpecifiedSchema
+  * @param provider
+  * @param temporary
+  * @param options
+  * @param allowExisting If it is true, we will do nothing when the table already exists.
+ *                      If it is false, an exception will be thrown
+  * @param managedIfNoPath
+  */
 private[sql] case class CreateTableUsing(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
     temporary: Boolean,
     options: Map[String, String],
-    allowExisting: Boolean) extends Command
+    allowExisting: Boolean,
+    managedIfNoPath: Boolean) extends Command
 
 private[sql] case class CreateTableUsingAsSelect(
     tableName: String,
     provider: String,
     temporary: Boolean,
+    mode: SaveMode,
     options: Map[String, String],
-    allowExisting: Boolean,
     query: String) extends Command
 
 private[sql] case class CreateTableUsingAsLogicalPlan(
     tableName: String,
     provider: String,
     temporary: Boolean,
+    mode: SaveMode,
     options: Map[String, String],
-    allowExisting: Boolean,
     query: LogicalPlan) extends Command
 
 private [sql] case class CreateTempTableUsing(
@@ -348,12 +371,13 @@ private [sql] case class CreateTempTableUsing(
 private [sql] case class CreateTempTableUsingAsSelect(
     tableName: String,
     provider: String,
+    mode: SaveMode,
     options: Map[String, String],
     query: LogicalPlan) extends RunnableCommand {
 
   def run(sqlContext: SQLContext) = {
     val df = DataFrame(sqlContext, query)
-    val resolved = ResolvedDataSource(sqlContext, provider, options, df)
+    val resolved = ResolvedDataSource(sqlContext, provider, mode, options, df)
     sqlContext.registerRDDAsTable(
       DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
 
@@ -364,7 +388,7 @@ private [sql] case class CreateTempTableUsingAsSelect(
 /**
  * Builds a map in which keys are case insensitive
  */
-protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
+protected[sql] class CaseInsensitiveMap(map: Map[String, String]) extends Map[String, String]
   with Serializable {
 
   val baseMap = map.map(kv => kv.copy(_1 = kv._1.toLowerCase))

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index 5eecc30..37fda7b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -79,8 +79,27 @@ trait SchemaRelationProvider {
 
 @DeveloperApi
 trait CreatableRelationProvider {
+  /**
+    * Creates a relation with the given parameters based on the contents of the given
+    * DataFrame. The mode specifies the expected behavior of createRelation when
+    * data already exists.
+    * Right now, there are three modes, Append, Overwrite, and ErrorIfExists.
+    * Append mode means that when saving a DataFrame to a data source, if data already exists,
+    * contents of the DataFrame are expected to be appended to existing data.
+    * Overwrite mode means that when saving a DataFrame to a data source, if data already exists,
+    * existing data is expected to be overwritten by the contents of the DataFrame.
+    * ErrorIfExists mode means that when saving a DataFrame to a data source,
+    * if data already exists, an exception is expected to be thrown.
+    *
+    * @param sqlContext
+    * @param mode
+    * @param parameters
+    * @param data
+    * @return
+    */
   def createRelation(
       sqlContext: SQLContext,
+      mode: SaveMode,
       parameters: Map[String, String],
       data: DataFrame): BaseRelation
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
----------------------------------------------------------------------
diff --git a/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java b/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
new file mode 100644
index 0000000..852baf0
--- /dev/null
+++ b/sql/core/src/test/java/org/apache/spark/sql/sources/JavaSaveLoadSuite.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.test.TestSQLContext$;
+import org.apache.spark.sql.*;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.util.Utils;
+
+public class JavaSaveLoadSuite {
+
+  private transient JavaSparkContext sc;
+  private transient SQLContext sqlContext;
+
+  String originalDefaultSource;
+  File path;
+  DataFrame df;
+
+  private void checkAnswer(DataFrame actual, List<Row> expected) {
+    String errorMessage = QueryTest$.MODULE$.checkAnswer(actual, expected);
+    if (errorMessage != null) {
+      Assert.fail(errorMessage);
+    }
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    sqlContext = TestSQLContext$.MODULE$;
+    sc = new JavaSparkContext(sqlContext.sparkContext());
+
+    originalDefaultSource = sqlContext.conf().defaultDataSourceName();
+    path =
+      Utils.createTempDir(System.getProperty("java.io.tmpdir"), "datasource").getCanonicalFile();
+    if (path.exists()) {
+      path.delete();
+    }
+
+    List<String> jsonObjects = new ArrayList<String>(10);
+    for (int i = 0; i < 10; i++) {
+      jsonObjects.add("{\"a\":" + i + ", \"b\":\"str" + i + "\"}");
+    }
+    JavaRDD<String> rdd = sc.parallelize(jsonObjects);
+    df = sqlContext.jsonRDD(rdd);
+    df.registerTempTable("jsonTable");
+  }
+
+  @Test
+  public void saveAndLoad() {
+    Map<String, String> options = new HashMap<String, String>();
+    options.put("path", path.toString());
+    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+
+    DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", options);
+
+    checkAnswer(loadedDF, df.collectAsList());
+  }
+
+  @Test
+  public void saveAndLoadWithSchema() {
+    Map<String, String> options = new HashMap<String, String>();
+    options.put("path", path.toString());
+    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, options);
+
+    List<StructField> fields = new ArrayList<>();
+    fields.add(DataTypes.createStructField("b", DataTypes.StringType, true));
+    StructType schema = DataTypes.createStructType(fields);
+    DataFrame loadedDF = sqlContext.load("org.apache.spark.sql.json", schema, options);
+
+    checkAnswer(loadedDF, sqlContext.sql("SELECT b FROM jsonTable").collectAsList());
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index f9ddd2c..dfb6858 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql
 
 import java.util.{Locale, TimeZone}
 
+import scala.collection.JavaConversions._
+
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.columnar.InMemoryRelation
@@ -52,9 +54,51 @@ class QueryTest extends PlanTest {
   /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param rdd the [[DataFrame]] to be executed
-   * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
    */
   protected def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Unit = {
+    QueryTest.checkAnswer(rdd, expectedAnswer) match {
+      case Some(errorMessage) => fail(errorMessage)
+      case None =>
+    }
+  }
+
+  protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
+    checkAnswer(rdd, Seq(expectedAnswer))
+  }
+
+  def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
+    test(sqlString) {
+      checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
+    }
+  }
+
+  /**
+   * Asserts that a given [[DataFrame]] will be executed using the given number of cached results.
+   */
+  def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
+    val planWithCaching = query.queryExecution.withCachedData
+    val cachedData = planWithCaching collect {
+      case cached: InMemoryRelation => cached
+    }
+
+    assert(
+      cachedData.size == numCachedTables,
+      s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
+        planWithCaching)
+  }
+}
+
+object QueryTest {
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   * If there was exception during the execution or the contents of the DataFrame does not
+   * match the expected result, an error message will be returned. Otherwise, a [[None]] will
+   * be returned.
+   * @param rdd the [[DataFrame]] to be executed
+   * @param expectedAnswer the expected result in a [[Seq]] of [[Row]]s.
+   */
+  def checkAnswer(rdd: DataFrame, expectedAnswer: Seq[Row]): Option[String] = {
     val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s }.nonEmpty
     def prepareAnswer(answer: Seq[Row]): Seq[Row] = {
       // Converts data to types that we can do equality comparison using Scala collections.
@@ -70,18 +114,20 @@ class QueryTest extends PlanTest {
     }
     val sparkAnswer = try rdd.collect().toSeq catch {
       case e: Exception =>
-        fail(
+        val errorMessage =
           s"""
             |Exception thrown while executing query:
             |${rdd.queryExecution}
             |== Exception ==
             |$e
             |${org.apache.spark.sql.catalyst.util.stackTraceToString(e)}
-          """.stripMargin)
+          """.stripMargin
+        return Some(errorMessage)
     }
 
     if (prepareAnswer(expectedAnswer) != prepareAnswer(sparkAnswer)) {
-      fail(s"""
+      val errorMessage =
+        s"""
         |Results do not match for query:
         |${rdd.logicalPlan}
         |== Analyzed Plan ==
@@ -90,37 +136,21 @@ class QueryTest extends PlanTest {
         |${rdd.queryExecution.executedPlan}
         |== Results ==
         |${sideBySide(
-        s"== Correct Answer - ${expectedAnswer.size} ==" +:
-          prepareAnswer(expectedAnswer).map(_.toString),
-        s"== Spark Answer - ${sparkAnswer.size} ==" +:
-          prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
-      """.stripMargin)
+          s"== Correct Answer - ${expectedAnswer.size} ==" +:
+            prepareAnswer(expectedAnswer).map(_.toString),
+          s"== Spark Answer - ${sparkAnswer.size} ==" +:
+            prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+      """.stripMargin
+      return Some(errorMessage)
     }
-  }
 
-  protected def checkAnswer(rdd: DataFrame, expectedAnswer: Row): Unit = {
-    checkAnswer(rdd, Seq(expectedAnswer))
-  }
-
-  def sqlTest(sqlString: String, expectedAnswer: Seq[Row])(implicit sqlContext: SQLContext): Unit = {
-    test(sqlString) {
-      checkAnswer(sqlContext.sql(sqlString), expectedAnswer)
-    }
+    return None
   }
 
-  /**
-   * Asserts that a given [[DataFrame]] will be executed using the given number of cached results.
-   */
-  def assertCached(query: DataFrame, numCachedTables: Int = 1): Unit = {
-    val planWithCaching = query.queryExecution.withCachedData
-    val cachedData = planWithCaching collect {
-      case cached: InMemoryRelation => cached
+  def checkAnswer(rdd: DataFrame, expectedAnswer: java.util.List[Row]): String = {
+    checkAnswer(rdd, expectedAnswer.toSeq) match {
+      case Some(errorMessage) => errorMessage
+      case None => null
     }
-
-    assert(
-      cachedData.size == numCachedTables,
-      s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
-        planWithCaching)
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
index b023899..29caed9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -77,12 +77,10 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
       sql("SELECT a, b FROM jsonTable"),
       sql("SELECT a, b FROM jt").collect())
 
-    dropTempTable("jsonTable")
-
-    val message = intercept[RuntimeException]{
+    val message = intercept[DDLException]{
       sql(
         s"""
-        |CREATE TEMPORARY TABLE jsonTable
+        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
         |USING org.apache.spark.sql.json.DefaultSource
         |OPTIONS (
         |  path '${path.toString}'
@@ -91,10 +89,25 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
       """.stripMargin)
     }.getMessage
     assert(
-      message.contains(s"path ${path.toString} already exists."),
+      message.contains(s"a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."),
       "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
 
-    // Explicitly delete it.
+    // Overwrite the temporary table.
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a * 4 FROM jt
+      """.stripMargin)
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      sql("SELECT a * 4 FROM jt").collect())
+
+    dropTempTable("jsonTable")
+    // Explicitly delete the data.
     if (path.exists()) Utils.deleteRecursively(path)
 
     sql(
@@ -104,12 +117,12 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
         |OPTIONS (
         |  path '${path.toString}'
         |) AS
-        |SELECT a * 4 FROM jt
+        |SELECT b FROM jt
       """.stripMargin)
 
     checkAnswer(
       sql("SELECT * FROM jsonTable"),
-      sql("SELECT a * 4 FROM jt").collect())
+      sql("SELECT b FROM jt").collect())
 
     dropTempTable("jsonTable")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
index fe2f76c..a510045 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -21,10 +21,10 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.util.Utils
-
 import org.apache.spark.sql.catalyst.util
+import org.apache.spark.sql.{SQLConf, DataFrame}
+import org.apache.spark.sql.types._
+import org.apache.spark.util.Utils
 
 class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
 
@@ -38,42 +38,60 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
 
   override def beforeAll(): Unit = {
     originalDefaultSource = conf.defaultDataSourceName
-    conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
 
     path = util.getTempFilePath("datasource").getCanonicalFile
 
     val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
     df = jsonRDD(rdd)
+    df.registerTempTable("jsonTable")
   }
 
   override def afterAll(): Unit = {
-    conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
   }
 
   after {
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
     if (path.exists()) Utils.deleteRecursively(path)
   }
 
   def checkLoad(): Unit = {
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
     checkAnswer(load(path.toString), df.collect())
-    checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect())
+
+    // Test if we can pick up the data source name passed in load.
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+    checkAnswer(load(path.toString, "org.apache.spark.sql.json"), df.collect())
+    checkAnswer(load("org.apache.spark.sql.json", Map("path" -> path.toString)), df.collect())
+    val schema = StructType(StructField("b", StringType, true) :: Nil)
+    checkAnswer(
+      load("org.apache.spark.sql.json", schema, Map("path" -> path.toString)),
+      sql("SELECT b FROM jsonTable").collect())
   }
 
-  test("save with overwrite and load") {
+  test("save with path and load") {
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
     df.save(path.toString)
-    checkLoad
+    checkLoad()
+  }
+
+  test("save with path and datasource, and load") {
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+    df.save(path.toString, "org.apache.spark.sql.json")
+    checkLoad()
   }
 
   test("save with data source and options, and load") {
-    df.save("org.apache.spark.sql.json", ("path", path.toString))
-    checkLoad
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "not a source name")
+    df.save("org.apache.spark.sql.json", SaveMode.ErrorIfExists, Map("path" -> path.toString))
+    checkLoad()
   }
 
   test("save and save again") {
-    df.save(path.toString)
+    df.save(path.toString, "org.apache.spark.sql.json")
 
-    val message = intercept[RuntimeException] {
-      df.save(path.toString)
+    var message = intercept[RuntimeException] {
+      df.save(path.toString, "org.apache.spark.sql.json")
     }.getMessage
 
     assert(
@@ -82,7 +100,18 @@ class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
 
     if (path.exists()) Utils.deleteRecursively(path)
 
-    df.save(path.toString)
-    checkLoad
+    df.save(path.toString, "org.apache.spark.sql.json")
+    checkLoad()
+
+    df.save("org.apache.spark.sql.json", SaveMode.Overwrite, Map("path" -> path.toString))
+    checkLoad()
+
+    message = intercept[RuntimeException] {
+      df.save("org.apache.spark.sql.json", SaveMode.Append, Map("path" -> path.toString))
+    }.getMessage
+
+    assert(
+      message.contains("Append mode is not supported"),
+      "We should complain that 'Append mode is not supported' for JSON source.")
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 2c00659..7ae6ed6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -80,18 +80,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   }
 
   /**
-   * Creates a table using the schema of the given class.
-   *
-   * @param tableName The name of the table to create.
-   * @param allowExisting When false, an exception will be thrown if the table already exists.
-   * @tparam A A case class that is used to describe the schema of the table to be created.
-   */
-  @Deprecated
-  def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
-    catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
-  }
-
-  /**
    * Invalidate and refresh all the cached the metadata of the given table. For performance reasons,
    * Spark SQL or the external data source library it uses might cache certain metadata about a
    * table, such as the location of blocks. When those change outside of Spark SQL, users should
@@ -107,70 +95,6 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     catalog.invalidateTable("default", tableName)
   }
 
-  @Experimental
-  def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = {
-    val dataSourceName = conf.defaultDataSourceName
-    createTable(tableName, dataSourceName, allowExisting, ("path", path))
-  }
-
-  @Experimental
-  def createTable(
-      tableName: String,
-      dataSourceName: String,
-      allowExisting: Boolean,
-      option: (String, String),
-      options: (String, String)*): Unit = {
-    val cmd =
-      CreateTableUsing(
-        tableName,
-        userSpecifiedSchema = None,
-        dataSourceName,
-        temporary = false,
-        (option +: options).toMap,
-        allowExisting)
-    executePlan(cmd).toRdd
-  }
-
-  @Experimental
-  def createTable(
-      tableName: String,
-      dataSourceName: String,
-      schema: StructType,
-      allowExisting: Boolean,
-      option: (String, String),
-      options: (String, String)*): Unit = {
-    val cmd =
-      CreateTableUsing(
-        tableName,
-        userSpecifiedSchema = Some(schema),
-        dataSourceName,
-        temporary = false,
-        (option +: options).toMap,
-        allowExisting)
-    executePlan(cmd).toRdd
-  }
-
-  @Experimental
-  def createTable(
-      tableName: String,
-      dataSourceName: String,
-      allowExisting: Boolean,
-      options: java.util.Map[String, String]): Unit = {
-    val opts = options.toSeq
-    createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*)
-  }
-
-  @Experimental
-  def createTable(
-      tableName: String,
-      dataSourceName: String,
-      schema: StructType,
-      allowExisting: Boolean,
-      options: java.util.Map[String, String]): Unit = {
-    val opts = options.toSeq
-    createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*)
-  }
-
   /**
    * Analyzes the given table in the current database to generate statistics, which will be
    * used in query optimizations.

http://git-wip-us.apache.org/repos/asf/spark/blob/aaf50d05/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 95abc36..cb138be 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -216,20 +216,21 @@ private[hive] trait HiveStrategies {
 
   object HiveDDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) =>
+      case CreateTableUsing(
+      tableName, userSpecifiedSchema, provider, false, opts, allowExisting, managedIfNoPath) =>
         ExecutedCommand(
           CreateMetastoreDataSource(
-            tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil
+            tableName, userSpecifiedSchema, provider, opts, allowExisting, managedIfNoPath)) :: Nil
 
-      case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) =>
+      case CreateTableUsingAsSelect(tableName, provider, false, mode, opts, query) =>
         val logicalPlan = hiveContext.parseSql(query)
         val cmd =
-          CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan)
+          CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, logicalPlan)
         ExecutedCommand(cmd) :: Nil
 
-      case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) =>
+      case CreateTableUsingAsLogicalPlan(tableName, provider, false, mode, opts, query) =>
         val cmd =
-          CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query)
+          CreateMetastoreDataSourceAsSelect(tableName, provider, mode, opts, query)
         ExecutedCommand(cmd) :: Nil
 
       case _ => Nil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org