You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/02/03 08:30:47 UTC

spark git commit: [SPARK-5501][SPARK-5420][SQL] Write support for the data source API

Repository: spark
Updated Branches:
  refs/heads/master 50a1a874e -> 13531dd97


[SPARK-5501][SPARK-5420][SQL] Write support for the data source API

This PR aims to support `INSERT INTO/OVERWRITE TABLE tableName` and `CREATE TABLE tableName AS SELECT` for the data source API (partitioned tables are not supported).

In this PR, I am also adding the support of `IF NOT EXISTS` for our ddl parser. The current semantic of `IF NOT EXISTS` is explained as follows.
* For a `CREATE TEMPORARY TABLE` statement, it does not `IF NOT EXISTS` for now.
* For a `CREATE TABLE` statement (we are creating a metastore table), if there is an existing table having the same name ...
  * when `IF NOT EXISTS` clause is used, we will do nothing.
  * when `IF NOT EXISTS` clause is not used, the user will see an exception saying the table already exists.

TODOs:
- [x] CTAS support
- [x] Programmatic APIs
- [ ] Python API (another PR)
- [x] More unit tests
- [ ] Documents (another PR)

marmbrus liancheng rxin

Author: Yin Huai <yh...@databricks.com>

Closes #4294 from yhuai/writeSupport and squashes the following commits:

3db1539 [Yin Huai] save does not take overwrite.
1c98881 [Yin Huai] Fix test.
142372a [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
34e1bfb [Yin Huai] Address comments.
1682ca6 [Yin Huai] Better support for CTAS statements.
e789d64 [Yin Huai] For the Scala API, let users to use tuples to provide options.
0128065 [Yin Huai] Short hand versions of save and load.
66ebd74 [Yin Huai] Formatting.
9203ec2 [Yin Huai] Merge remote-tracking branch 'upstream/master' into writeSupport
e5d29f2 [Yin Huai] Programmatic APIs.
1a719a5 [Yin Huai] CREATE TEMPORARY TABLE with IF NOT EXISTS is not allowed for now.
909924f [Yin Huai] Add saveAsTable for the data source API to DataFrame.
95a7c71 [Yin Huai] Fix bug when handling IF NOT EXISTS clause in a CREATE TEMPORARY TABLE statement.
d37b19c [Yin Huai] Cheng's comments.
fd6758c [Yin Huai] Use BeforeAndAfterAll.
7880891 [Yin Huai] Support CREATE TABLE AS SELECT STATEMENT and the IF NOT EXISTS clause.
cb85b05 [Yin Huai] Initial write support.
2f91354 [Yin Huai] Make INSERT OVERWRITE/INTO statements consistent between HiveQL and SqlParser.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/13531dd9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/13531dd9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/13531dd9

Branch: refs/heads/master
Commit: 13531dd97c08563e53dacdaeaf1102bdd13ef825
Parents: 50a1a87
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Feb 2 23:30:44 2015 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Mon Feb 2 23:30:44 2015 -0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/SqlParser.scala   |   4 +-
 .../scala/org/apache/spark/sql/DataFrame.scala  |  49 ++++-
 .../org/apache/spark/sql/DataFrameImpl.scala    |  58 +++++-
 .../apache/spark/sql/IncomputableColumn.scala   |  22 +++
 .../scala/org/apache/spark/sql/SQLConf.scala    |   6 +
 .../scala/org/apache/spark/sql/SQLContext.scala |  26 ++-
 .../main/scala/org/apache/spark/sql/api.scala   |  27 +++
 .../spark/sql/execution/SparkStrategies.scala   |  31 +++-
 .../apache/spark/sql/json/JSONRelation.scala    |  59 +++++-
 .../spark/sql/sources/DataSourceStrategy.scala  |   9 +-
 .../org/apache/spark/sql/sources/commands.scala |  35 ++++
 .../org/apache/spark/sql/sources/ddl.scala      | 126 +++++++++++--
 .../apache/spark/sql/sources/interfaces.scala   |  23 ++-
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  12 +-
 .../sql/sources/CreateTableAsSelectSuite.scala  | 147 +++++++++++++++
 .../spark/sql/sources/InsertIntoSuite.scala     |  96 ++++++++++
 .../spark/sql/sources/SaveLoadSuite.scala       |  88 +++++++++
 .../org/apache/spark/sql/hive/HiveContext.scala |  67 ++++++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  35 +++-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  18 +-
 .../spark/sql/hive/execution/commands.scala     |  76 +++++++-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 185 ++++++++++++++++++-
 .../sql/hive/execution/SQLQuerySuite.scala      |   2 +-
 23 files changed, 1141 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
index 594a423..25e639d 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/SqlParser.scala
@@ -147,8 +147,8 @@ class SqlParser extends AbstractSparkSQLParser {
       }
 
   protected lazy val insert: Parser[LogicalPlan] =
-    INSERT ~> OVERWRITE.? ~ (INTO ~> relation) ~ select ^^ {
-      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o.isDefined)
+    INSERT ~> (OVERWRITE ^^^ true | INTO ^^^ false) ~ (TABLE ~> relation) ~ select ^^ {
+      case o ~ r ~ s => InsertIntoTable(r, Map.empty[String, Option[String]], s, o)
     }
 
   protected lazy val projection: Parser[Expression] =

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 385e1ec..4cbfb6a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql
 
-import java.util.{List => JList}
-
 import scala.reflect.ClassTag
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -487,6 +485,53 @@ trait DataFrame extends DataFrameSpecificApi with RDDApi[Row] {
 
   /**
    * :: Experimental ::
+   * Creates a table from the the contents of this DataFrame based on a given data source and
+   * a set of options. This will fail if the table already exists.
+   *
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the the contents of this DataFrame based on a given data source and
+   * a set of options. This will fail if the table already exists.
+   *
+   * Note that this currently only works with DataFrames that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   */
+  @Experimental
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit
+
+  @Experimental
+  override def save(path: String): Unit
+
+  @Experimental
+  override def save(
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit
+
+  @Experimental
+  override def save(
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit
+
+  /**
+   * :: Experimental ::
    * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
    */
   @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
index f8fcc25..f84dbf3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameImpl.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.plans.{JoinType, Inner}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{LogicalRDD, EvaluatePython}
 import org.apache.spark.sql.json.JsonRDD
+import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsLogicalPlan}
 import org.apache.spark.sql.types.{NumericType, StructType}
 import org.apache.spark.util.Utils
 
@@ -303,8 +304,61 @@ private[sql] class DataFrameImpl protected[sql](
   }
 
   override def saveAsTable(tableName: String): Unit = {
-    sqlContext.executePlan(
-      CreateTableAsSelect(None, tableName, logicalPlan, allowExisting = false)).toRdd
+    val dataSourceName = sqlContext.conf.defaultDataSourceName
+    val cmd =
+      CreateTableUsingAsLogicalPlan(
+        tableName,
+        dataSourceName,
+        temporary = false,
+        Map.empty,
+        allowExisting = false,
+        logicalPlan)
+
+    sqlContext.executePlan(cmd).toRdd
+  }
+
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit = {
+    val cmd =
+      CreateTableUsingAsLogicalPlan(
+        tableName,
+        dataSourceName,
+        temporary = false,
+        (option +: options).toMap,
+        allowExisting = false,
+        logicalPlan)
+
+    sqlContext.executePlan(cmd).toRdd
+  }
+
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit = {
+    val opts = options.toSeq
+    saveAsTable(tableName, dataSourceName, opts.head, opts.tail:_*)
+  }
+
+  override def save(path: String): Unit = {
+    val dataSourceName = sqlContext.conf.defaultDataSourceName
+    save(dataSourceName, ("path" -> path))
+  }
+
+  override def save(
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit = {
+    ResolvedDataSource(sqlContext, dataSourceName, (option +: options).toMap, this)
+  }
+
+  override def save(
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit = {
+    val opts = options.toSeq
+    save(dataSourceName, opts.head, opts.tail:_*)
   }
 
   override def insertInto(tableName: String, overwrite: Boolean): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
index 2f8c695..9b051de 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/IncomputableColumn.scala
@@ -152,6 +152,28 @@ private[sql] class IncomputableColumn(protected[sql] val expr: Expression) exten
 
   override def saveAsTable(tableName: String): Unit = err()
 
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit = err()
+
+  override def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit = err()
+
+  override def save(path: String): Unit = err()
+
+  override def save(
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit = err()
+
+  override def save(
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit = err()
+
   override def insertInto(tableName: String, overwrite: Boolean): Unit = err()
 
   override def toJSON: RDD[String] = err()

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index 243dc99..561a91d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -47,6 +47,9 @@ private[spark] object SQLConf {
   // This is only used for the thriftserver
   val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool"
 
+  // This is used to set the default data source
+  val DEFAULT_DATA_SOURCE_NAME = "spark.sql.default.datasource"
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -155,6 +158,9 @@ private[sql] class SQLConf extends Serializable {
   private[spark] def broadcastTimeout: Int =
     getConf(BROADCAST_TIMEOUT, (5 * 60).toString).toInt
 
+  private[spark] def defaultDataSourceName: String =
+    getConf(DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.parquet")
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index f4692b3..a741d00 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -21,6 +21,7 @@ import java.beans.Introspector
 import java.util.Properties
 
 import scala.collection.immutable
+import scala.collection.JavaConversions._
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
@@ -37,7 +38,7 @@ import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.json._
 import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
-import org.apache.spark.sql.sources.{LogicalRelation, BaseRelation, DDLParser, DataSourceStrategy}
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
@@ -335,6 +336,29 @@ class SQLContext(@transient val sparkContext: SparkContext)
     applySchema(rowRDD, appliedSchema)
   }
 
+  @Experimental
+  def load(path: String): DataFrame = {
+    val dataSourceName = conf.defaultDataSourceName
+    load(dataSourceName, ("path", path))
+  }
+
+  @Experimental
+  def load(
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): DataFrame = {
+    val resolved = ResolvedDataSource(this, None, dataSourceName, (option +: options).toMap)
+    DataFrame(this, LogicalRelation(resolved.relation))
+  }
+
+  @Experimental
+  def load(
+      dataSourceName: String,
+      options: java.util.Map[String, String]): DataFrame = {
+    val opts = options.toSeq
+    load(dataSourceName, opts.head, opts.tail:_*)
+  }
+
   /**
    * :: Experimental ::
    * Construct an RDD representing the database table accessible via JDBC URL

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/api.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
index eb0eb3f..c4a00cd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api.scala
@@ -171,6 +171,33 @@ private[sql] trait DataFrameSpecificApi {
   def saveAsTable(tableName: String): Unit
 
   @Experimental
+  def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit
+
+  @Experimental
+  def saveAsTable(
+      tableName: String,
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit
+
+  @Experimental
+  def save(path: String): Unit
+
+  @Experimental
+  def save(
+      dataSourceName: String,
+      option: (String, String),
+      options: (String, String)*): Unit
+
+  @Experimental
+  def save(
+      dataSourceName: String,
+      options: java.util.Map[String, String]): Unit
+
+  @Experimental
   def insertInto(tableName: String, overwrite: Boolean): Unit
 
   @Experimental

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 0cc9d04..ff0609d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation}
 import org.apache.spark.sql.parquet._
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.sources.{CreateTempTableUsing, CreateTableUsing}
+import org.apache.spark.sql.sources._
 
 
 private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
@@ -314,12 +314,33 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, options) =>
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, true, opts, false) =>
         ExecutedCommand(
-          CreateTempTableUsing(tableName, userSpecifiedSchema, provider, options)) :: Nil
-
-      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+          CreateTempTableUsing(
+            tableName, userSpecifiedSchema, provider, opts)) :: Nil
+      case c: CreateTableUsing if !c.temporary =>
+        sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+      case c: CreateTableUsing if c.temporary && c.allowExisting =>
+        sys.error("allowExisting should be set to false when creating a temporary table.")
+
+      case CreateTableUsingAsSelect(tableName, provider, true, opts, false, query) =>
+        val logicalPlan = sqlContext.parseSql(query)
+        val cmd =
+          CreateTempTableUsingAsSelect(tableName, provider, opts, logicalPlan)
+        ExecutedCommand(cmd) :: Nil
+      case c: CreateTableUsingAsSelect if !c.temporary =>
+        sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+      case c: CreateTableUsingAsSelect if c.temporary && c.allowExisting =>
+        sys.error("allowExisting should be set to false when creating a temporary table.")
+
+      case CreateTableUsingAsLogicalPlan(tableName, provider, true, opts, false, query) =>
+        val cmd =
+          CreateTempTableUsingAsSelect(tableName, provider, opts, query)
+        ExecutedCommand(cmd) :: Nil
+      case c: CreateTableUsingAsLogicalPlan if !c.temporary =>
         sys.error("Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.")
+      case c: CreateTableUsingAsLogicalPlan if c.temporary && c.allowExisting =>
+        sys.error("allowExisting should be set to false when creating a temporary table.")
 
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
index 1af96c2..8372dec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
@@ -17,21 +17,26 @@
 
 package org.apache.spark.sql.json
 
-import org.apache.spark.sql.SQLContext
+import java.io.IOException
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 
 
-private[sql] class DefaultSource extends RelationProvider with SchemaRelationProvider {
+private[sql] class DefaultSource
+  extends RelationProvider with SchemaRelationProvider with CreateableRelationProvider {
 
   /** Returns a new base relation with the parameters. */
   override def createRelation(
       sqlContext: SQLContext,
       parameters: Map[String, String]): BaseRelation = {
-    val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+    val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
     val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
-    JSONRelation(fileName, samplingRatio, None)(sqlContext)
+    JSONRelation(path, samplingRatio, None)(sqlContext)
   }
 
   /** Returns a new base relation with the given schema and parameters. */
@@ -39,21 +44,37 @@ private[sql] class DefaultSource extends RelationProvider with SchemaRelationPro
       sqlContext: SQLContext,
       parameters: Map[String, String],
       schema: StructType): BaseRelation = {
-    val fileName = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+    val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
     val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
 
-    JSONRelation(fileName, samplingRatio, Some(schema))(sqlContext)
+    JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
+  }
+
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation = {
+    val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
+    val filesystemPath = new Path(path)
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+    if (fs.exists(filesystemPath)) {
+      sys.error(s"path $path already exists.")
+    }
+    data.toJSON.saveAsTextFile(path)
+
+    createRelation(sqlContext, parameters, data.schema)
   }
 }
 
 private[sql] case class JSONRelation(
-    fileName: String,
+    path: String,
     samplingRatio: Double,
     userSpecifiedSchema: Option[StructType])(
     @transient val sqlContext: SQLContext)
-  extends TableScan {
+  extends TableScan with InsertableRelation {
 
-  private def baseRDD = sqlContext.sparkContext.textFile(fileName)
+  // TODO: Support partitioned JSON relation.
+  private def baseRDD = sqlContext.sparkContext.textFile(path)
 
   override val schema = userSpecifiedSchema.getOrElse(
     JsonRDD.nullTypeToStringType(
@@ -64,4 +85,24 @@ private[sql] case class JSONRelation(
 
   override def buildScan() =
     JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
+
+  override def insert(data: DataFrame, overwrite: Boolean) = {
+    val filesystemPath = new Path(path)
+    val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+
+    if (overwrite) {
+      try {
+        fs.delete(filesystemPath, true)
+      } catch {
+        case e: IOException =>
+          throw new IOException(
+            s"Unable to clear output directory ${filesystemPath.toString} prior"
+              + s" to INSERT OVERWRITE a JSON table:\n${e.toString}")
+      }
+      data.toJSON.saveAsTextFile(path)
+    } else {
+      // TODO: Support INSERT INTO
+      sys.error("JSON table only support INSERT OVERWRITE for now.")
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index d13f2ce..386ff24 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -22,7 +22,7 @@ import org.apache.spark.sql.{Row, Strategy}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression}
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, InsertIntoTable => LogicalInsertIntoTable}
 import org.apache.spark.sql.execution
 
 /**
@@ -54,6 +54,13 @@ private[sql] object DataSourceStrategy extends Strategy {
     case l @ LogicalRelation(t: TableScan) =>
       execution.PhysicalRDD(l.output, t.buildScan()) :: Nil
 
+    case i @ LogicalInsertIntoTable(
+      l @ LogicalRelation(t: InsertableRelation), partition, query, overwrite) =>
+      if (partition.nonEmpty) {
+        sys.error(s"Insert into a partition is not allowed because $l is not partitioned.")
+      }
+      execution.ExecutedCommand(InsertIntoRelation(t, query, overwrite)) :: Nil
+
     case _ => Nil
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
new file mode 100644
index 0000000..d7942dc
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.sources
+
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.RunnableCommand
+
+private[sql] case class InsertIntoRelation(
+    relation: InsertableRelation,
+    query: LogicalPlan,
+    overwrite: Boolean)
+  extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext) = {
+    relation.insert(DataFrame(sqlContext, query), overwrite)
+
+    Seq.empty[Row]
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index b1bbe0f..ead8277 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -36,6 +36,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
     try {
       Some(apply(input))
     } catch {
+      case ddlException: DDLException => throw ddlException
       case _ if !exceptionOnError => None
       case x: Throwable => throw x
     }
@@ -45,8 +46,7 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
     lexical.initialize(reservedWords)
     phrase(dataType)(new lexical.Scanner(input)) match {
       case Success(r, x) => r
-      case x =>
-        sys.error(s"Unsupported dataType: $x")
+      case x => throw new DDLException(s"Unsupported dataType: $x")
     }
   }
 
@@ -56,8 +56,12 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
   protected val CREATE = Keyword("CREATE")
   protected val TEMPORARY = Keyword("TEMPORARY")
   protected val TABLE = Keyword("TABLE")
+  protected val IF = Keyword("IF")
+  protected val NOT = Keyword("NOT")
+  protected val EXISTS = Keyword("EXISTS")
   protected val USING = Keyword("USING")
   protected val OPTIONS = Keyword("OPTIONS")
+  protected val AS = Keyword("AS")
   protected val COMMENT = Keyword("COMMENT")
 
   // Data types.
@@ -83,22 +87,51 @@ private[sql] class DDLParser extends AbstractSparkSQLParser with Logging {
   protected def start: Parser[LogicalPlan] = ddl
 
   /**
-   * `CREATE [TEMPORARY] TABLE avroTable
+   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
    * USING org.apache.spark.sql.avro
    * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
    * or
-   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...)
+   * `CREATE [TEMPORARY] TABLE avroTable(intField int, stringField string...) [IF NOT EXISTS]
    * USING org.apache.spark.sql.avro
    * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * or
+   * `CREATE [TEMPORARY] TABLE avroTable [IF NOT EXISTS]
+   * USING org.apache.spark.sql.avro
+   * OPTIONS (path "../hive/src/test/resources/data/files/episodes.avro")`
+   * AS SELECT ...
    */
   protected lazy val createTable: Parser[LogicalPlan] =
   (
-    (CREATE ~> TEMPORARY.? <~ TABLE) ~ ident
-      ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ^^ {
-      case temp ~ tableName ~ columns ~ provider ~ opts =>
-        val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
-        CreateTableUsing(tableName, userSpecifiedSchema, provider, temp.isDefined, opts)
-    }
+    (CREATE ~> TEMPORARY.? <~ TABLE) ~ (IF ~> NOT <~ EXISTS).? ~ ident
+      ~ (tableCols).? ~ (USING ~> className) ~ (OPTIONS ~> options) ~ (AS ~> restInput).? ^^ {
+      case temp ~ allowExisting ~ tableName ~ columns ~ provider ~ opts ~ query =>
+        if (temp.isDefined && allowExisting.isDefined) {
+          throw new DDLException(
+            "a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause.")
+        }
+
+        if (query.isDefined) {
+          if (columns.isDefined) {
+            throw new DDLException(
+              "a CREATE TABLE AS SELECT statement does not allow column definitions.")
+          }
+          CreateTableUsingAsSelect(tableName,
+            provider,
+            temp.isDefined,
+            opts,
+            allowExisting.isDefined,
+            query.get)
+        } else {
+          val userSpecifiedSchema = columns.flatMap(fields => Some(StructType(fields)))
+          CreateTableUsing(
+            tableName,
+            userSpecifiedSchema,
+            provider,
+            temp.isDefined,
+            opts,
+            allowExisting.isDefined)
+        }
+      }
   )
 
   protected lazy val tableCols: Parser[Seq[StructField]] =  "(" ~> repsep(column, ",") <~ ")"
@@ -193,7 +226,7 @@ object ResolvedDataSource {
             dataSource
               .asInstanceOf[org.apache.spark.sql.sources.SchemaRelationProvider]
               .createRelation(sqlContext, new CaseInsensitiveMap(options), schema)
-          case _ =>
+          case dataSource: org.apache.spark.sql.sources.RelationProvider =>
             sys.error(s"${clazz.getCanonicalName} does not allow user-specified schemas.")
         }
       }
@@ -203,7 +236,7 @@ object ResolvedDataSource {
             dataSource
               .asInstanceOf[org.apache.spark.sql.sources.RelationProvider]
               .createRelation(sqlContext, new CaseInsensitiveMap(options))
-          case _ =>
+          case dataSource: org.apache.spark.sql.sources.SchemaRelationProvider =>
             sys.error(s"A schema needs to be specified when using ${clazz.getCanonicalName}.")
         }
       }
@@ -211,6 +244,32 @@ object ResolvedDataSource {
 
     new ResolvedDataSource(clazz, relation)
   }
+
+  def apply(
+      sqlContext: SQLContext,
+      provider: String,
+      options: Map[String, String],
+      data: DataFrame): ResolvedDataSource = {
+    val loader = Utils.getContextOrSparkClassLoader
+    val clazz: Class[_] = try loader.loadClass(provider) catch {
+      case cnf: java.lang.ClassNotFoundException =>
+        try loader.loadClass(provider + ".DefaultSource") catch {
+          case cnf: java.lang.ClassNotFoundException =>
+            sys.error(s"Failed to load class for data source: $provider")
+        }
+    }
+
+    val relation = clazz.newInstance match {
+      case dataSource: org.apache.spark.sql.sources.CreateableRelationProvider =>
+        dataSource
+          .asInstanceOf[org.apache.spark.sql.sources.CreateableRelationProvider]
+          .createRelation(sqlContext, options, data)
+      case _ =>
+        sys.error(s"${clazz.getCanonicalName} does not allow create table as select.")
+    }
+
+    new ResolvedDataSource(clazz, relation)
+  }
 }
 
 private[sql] case class ResolvedDataSource(provider: Class[_], relation: BaseRelation)
@@ -220,13 +279,30 @@ private[sql] case class CreateTableUsing(
     userSpecifiedSchema: Option[StructType],
     provider: String,
     temporary: Boolean,
-    options: Map[String, String]) extends Command
+    options: Map[String, String],
+    allowExisting: Boolean) extends Command
+
+private[sql] case class CreateTableUsingAsSelect(
+    tableName: String,
+    provider: String,
+    temporary: Boolean,
+    options: Map[String, String],
+    allowExisting: Boolean,
+    query: String) extends Command
+
+private[sql] case class CreateTableUsingAsLogicalPlan(
+    tableName: String,
+    provider: String,
+    temporary: Boolean,
+    options: Map[String, String],
+    allowExisting: Boolean,
+    query: LogicalPlan) extends Command
 
 private [sql] case class CreateTempTableUsing(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
-    options: Map[String, String])  extends RunnableCommand {
+    options: Map[String, String]) extends RunnableCommand {
 
   def run(sqlContext: SQLContext) = {
     val resolved = ResolvedDataSource(sqlContext, userSpecifiedSchema, provider, options)
@@ -236,6 +312,22 @@ private [sql] case class CreateTempTableUsing(
   }
 }
 
+private [sql] case class CreateTempTableUsingAsSelect(
+    tableName: String,
+    provider: String,
+    options: Map[String, String],
+    query: LogicalPlan) extends RunnableCommand {
+
+  def run(sqlContext: SQLContext) = {
+    val df = DataFrame(sqlContext, query)
+    val resolved = ResolvedDataSource(sqlContext, provider, options, df)
+    sqlContext.registerRDDAsTable(
+      DataFrame(sqlContext, LogicalRelation(resolved.relation)), tableName)
+
+    Seq.empty
+  }
+}
+
 /**
  * Builds a map in which keys are case insensitive
  */
@@ -253,3 +345,9 @@ protected class CaseInsensitiveMap(map: Map[String, String]) extends Map[String,
 
   override def -(key: String): Map[String, String] = baseMap - key.toLowerCase()
 }
+
+/**
+ * The exception thrown from the DDL parser.
+ * @param message
+ */
+protected[sql] class DDLException(message: String) extends Exception(message)

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
index cd82cc6..ad0a35b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/interfaces.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.sources
 
 import org.apache.spark.annotation.{Experimental, DeveloperApi}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.{Expression, Attribute}
 import org.apache.spark.sql.types.StructType
 
@@ -77,6 +77,14 @@ trait SchemaRelationProvider {
       schema: StructType): BaseRelation
 }
 
+@DeveloperApi
+trait CreateableRelationProvider {
+  def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      data: DataFrame): BaseRelation
+}
+
 /**
  * ::DeveloperApi::
  * Represents a collection of tuples with a known schema.  Classes that extend BaseRelation must
@@ -108,7 +116,7 @@ abstract class BaseRelation {
  * A BaseRelation that can produce all of its tuples as an RDD of Row objects.
  */
 @DeveloperApi
-abstract class TableScan extends BaseRelation {
+trait TableScan extends BaseRelation {
   def buildScan(): RDD[Row]
 }
 
@@ -118,7 +126,7 @@ abstract class TableScan extends BaseRelation {
  * containing all of its tuples as Row objects.
  */
 @DeveloperApi
-abstract class PrunedScan extends BaseRelation {
+trait PrunedScan extends BaseRelation {
   def buildScan(requiredColumns: Array[String]): RDD[Row]
 }
 
@@ -132,7 +140,7 @@ abstract class PrunedScan extends BaseRelation {
  * as filtering partitions based on a bloom filter.
  */
 @DeveloperApi
-abstract class PrunedFilteredScan extends BaseRelation {
+trait PrunedFilteredScan extends BaseRelation {
   def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
 }
 
@@ -145,6 +153,11 @@ abstract class PrunedFilteredScan extends BaseRelation {
  * for experimentation.
  */
 @Experimental
-abstract class CatalystScan extends BaseRelation {
+trait CatalystScan extends BaseRelation {
   def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row]
 }
+
+@DeveloperApi
+trait InsertableRelation extends BaseRelation {
+  def insert(data: DataFrame, overwrite: Boolean): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 3d82f4b..5ec7a15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -37,11 +37,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest {
   test("appending") {
     val data = (0 until 10).map(i => (i, i.toString))
     withParquetTable(data, "t") {
-      sql("INSERT INTO t SELECT * FROM t")
+      sql("INSERT INTO TABLE t SELECT * FROM t")
       checkAnswer(table("t"), (data ++ data).map(Row.fromTuple))
     }
   }
 
+  // This test case will trigger the NPE mentioned in
+  // https://issues.apache.org/jira/browse/PARQUET-151.
+  ignore("overwriting") {
+    val data = (0 until 10).map(i => (i, i.toString))
+    withParquetTable(data, "t") {
+      sql("INSERT OVERWRITE TABLE t SELECT * FROM t")
+      checkAnswer(table("t"), data.map(Row.fromTuple))
+    }
+  }
+
   test("self-join") {
     // 4 rows, cells of column 1 of row 2 and row 4 are null
     val data = (1 to 4).map { i =>

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
new file mode 100644
index 0000000..b023899
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll {
+
+  import caseInsensisitiveContext._
+
+  var path: File = null
+
+  override def beforeAll(): Unit = {
+    path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    jsonRDD(rdd).registerTempTable("jt")
+  }
+
+  override def afterAll(): Unit = {
+    dropTempTable("jt")
+  }
+
+  after {
+    if (path.exists()) Utils.deleteRecursively(path)
+  }
+
+  test("CREATE TEMPORARY TABLE AS SELECT") {
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a, b FROM jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT a, b FROM jsonTable"),
+      sql("SELECT a, b FROM jt").collect())
+
+    dropTempTable("jsonTable")
+  }
+
+  test("create a table, drop it and create another one with the same name") {
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a, b FROM jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT a, b FROM jsonTable"),
+      sql("SELECT a, b FROM jt").collect())
+
+    dropTempTable("jsonTable")
+
+    val message = intercept[RuntimeException]{
+      sql(
+        s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a * 4 FROM jt
+      """.stripMargin)
+    }.getMessage
+    assert(
+      message.contains(s"path ${path.toString} already exists."),
+      "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
+
+    // Explicitly delete it.
+    if (path.exists()) Utils.deleteRecursively(path)
+
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a * 4 FROM jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT * FROM jsonTable"),
+      sql("SELECT a * 4 FROM jt").collect())
+
+    dropTempTable("jsonTable")
+  }
+
+  test("CREATE TEMPORARY TABLE AS SELECT with IF NOT EXISTS is not allowed") {
+    val message = intercept[DDLException]{
+      sql(
+        s"""
+        |CREATE TEMPORARY TABLE IF NOT EXISTS jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT b FROM jt
+      """.stripMargin)
+    }.getMessage
+    assert(
+      message.contains("a CREATE TEMPORARY TABLE statement does not allow IF NOT EXISTS clause."),
+      "CREATE TEMPORARY TABLE IF NOT EXISTS should not be allowed.")
+  }
+
+  test("a CTAS statement with column definitions is not allowed") {
+    intercept[DDLException]{
+      sql(
+        s"""
+        |CREATE TEMPORARY TABLE jsonTable (a int, b string)
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |) AS
+        |SELECT a, b FROM jt
+      """.stripMargin)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
new file mode 100644
index 0000000..f91cea6
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertIntoSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.util
+import org.apache.spark.util.Utils
+
+class InsertIntoSuite extends DataSourceTest with BeforeAndAfterAll {
+
+  import caseInsensisitiveContext._
+
+  var path: File = null
+
+  override def beforeAll: Unit = {
+    path = util.getTempFilePath("jsonCTAS").getCanonicalFile
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    jsonRDD(rdd).registerTempTable("jt")
+    sql(
+      s"""
+        |CREATE TEMPORARY TABLE jsonTable (a int, b string)
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${path.toString}'
+        |)
+      """.stripMargin)
+  }
+
+  override def afterAll: Unit = {
+    dropTempTable("jsonTable")
+    dropTempTable("jt")
+    if (path.exists()) Utils.deleteRecursively(path)
+  }
+
+  test("Simple INSERT OVERWRITE a JSONRelation") {
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT a, b FROM jsonTable"),
+      (1 to 10).map(i => Row(i, s"str$i"))
+    )
+  }
+
+  test("INSERT OVERWRITE a JSONRelation multiple times") {
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+      """.stripMargin)
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+      """.stripMargin)
+
+    sql(
+      s"""
+        |INSERT OVERWRITE TABLE jsonTable SELECT a, b FROM jt
+      """.stripMargin)
+
+    checkAnswer(
+      sql("SELECT a, b FROM jsonTable"),
+      (1 to 10).map(i => Row(i, s"str$i"))
+    )
+  }
+
+  test("INSERT INTO not supported for JSONRelation for now") {
+    intercept[RuntimeException]{
+      sql(
+        s"""
+        |INSERT INTO TABLE jsonTable SELECT a, b FROM jt
+      """.stripMargin)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
new file mode 100644
index 0000000..fe2f76c
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/SaveLoadSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.sources
+
+import java.io.File
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.util.Utils
+
+import org.apache.spark.sql.catalyst.util
+
+class SaveLoadSuite extends DataSourceTest with BeforeAndAfterAll {
+
+  import caseInsensisitiveContext._
+
+  var originalDefaultSource: String = null
+
+  var path: File = null
+
+  var df: DataFrame = null
+
+  override def beforeAll(): Unit = {
+    originalDefaultSource = conf.defaultDataSourceName
+    conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
+
+    path = util.getTempFilePath("datasource").getCanonicalFile
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    df = jsonRDD(rdd)
+  }
+
+  override def afterAll(): Unit = {
+    conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+  }
+
+  after {
+    if (path.exists()) Utils.deleteRecursively(path)
+  }
+
+  def checkLoad(): Unit = {
+    checkAnswer(load(path.toString), df.collect())
+    checkAnswer(load("org.apache.spark.sql.json", ("path", path.toString)), df.collect())
+  }
+
+  test("save with overwrite and load") {
+    df.save(path.toString)
+    checkLoad
+  }
+
+  test("save with data source and options, and load") {
+    df.save("org.apache.spark.sql.json", ("path", path.toString))
+    checkLoad
+  }
+
+  test("save and save again") {
+    df.save(path.toString)
+
+    val message = intercept[RuntimeException] {
+      df.save(path.toString)
+    }.getMessage
+
+    assert(
+      message.contains("already exists"),
+      "We should complain that the path already exists.")
+
+    if (path.exists()) Utils.deleteRecursively(path)
+
+    df.save(path.toString)
+    checkLoad
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 5efc3b1..f6d9027 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -41,7 +41,7 @@ import org.apache.spark.sql.catalyst.analysis.{Analyzer, EliminateAnalysisOperat
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUdfs, SetCommand, QueryExecutionException}
 import org.apache.spark.sql.hive.execution.{HiveNativeCommand, DescribeHiveTableCommand}
-import org.apache.spark.sql.sources.DataSourceStrategy
+import org.apache.spark.sql.sources.{CreateTableUsing, DataSourceStrategy}
 import org.apache.spark.sql.types._
 
 /**
@@ -86,6 +86,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
    * @param allowExisting When false, an exception will be thrown if the table already exists.
    * @tparam A A case class that is used to describe the schema of the table to be created.
    */
+  @Deprecated
   def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
     catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
   }
@@ -106,6 +107,70 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     catalog.invalidateTable("default", tableName)
   }
 
+  @Experimental
+  def createTable(tableName: String, path: String, allowExisting: Boolean): Unit = {
+    val dataSourceName = conf.defaultDataSourceName
+    createTable(tableName, dataSourceName, allowExisting, ("path", path))
+  }
+
+  @Experimental
+  def createTable(
+      tableName: String,
+      dataSourceName: String,
+      allowExisting: Boolean,
+      option: (String, String),
+      options: (String, String)*): Unit = {
+    val cmd =
+      CreateTableUsing(
+        tableName,
+        userSpecifiedSchema = None,
+        dataSourceName,
+        temporary = false,
+        (option +: options).toMap,
+        allowExisting)
+    executePlan(cmd).toRdd
+  }
+
+  @Experimental
+  def createTable(
+      tableName: String,
+      dataSourceName: String,
+      schema: StructType,
+      allowExisting: Boolean,
+      option: (String, String),
+      options: (String, String)*): Unit = {
+    val cmd =
+      CreateTableUsing(
+        tableName,
+        userSpecifiedSchema = Some(schema),
+        dataSourceName,
+        temporary = false,
+        (option +: options).toMap,
+        allowExisting)
+    executePlan(cmd).toRdd
+  }
+
+  @Experimental
+  def createTable(
+      tableName: String,
+      dataSourceName: String,
+      allowExisting: Boolean,
+      options: java.util.Map[String, String]): Unit = {
+    val opts = options.toSeq
+    createTable(tableName, dataSourceName, allowExisting, opts.head, opts.tail:_*)
+  }
+
+  @Experimental
+  def createTable(
+      tableName: String,
+      dataSourceName: String,
+      schema: StructType,
+      allowExisting: Boolean,
+      options: java.util.Map[String, String]): Unit = {
+    val opts = options.toSeq
+    createTable(tableName, dataSourceName, schema, allowExisting, opts.head, opts.tail:_*)
+  }
+
   /**
    * Analyzes the given table in the current database to generate statistics, which will be
    * used in query optimizations.

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index d910ee9..48bea6c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,10 +23,9 @@ import java.util.{List => JList}
 import com.google.common.cache.{LoadingCache, CacheLoader, CacheBuilder}
 
 import org.apache.hadoop.util.ReflectionUtils
-import org.apache.hadoop.hive.metastore.TableType
-import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, FieldSchema}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
-import org.apache.hadoop.hive.ql.metadata.InvalidTableException
+import org.apache.hadoop.hive.metastore.{Warehouse, TableType}
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition, AlreadyExistsException, FieldSchema}
+import org.apache.hadoop.hive.ql.metadata._
 import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 import org.apache.hadoop.hive.serde.serdeConstants
 import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
@@ -52,6 +51,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
   /** Connection to hive metastore.  Usages should lock on `this`. */
   protected[hive] val client = Hive.get(hive.hiveconf)
 
+  protected[hive] lazy val hiveWarehouse = new Warehouse(hive.hiveconf)
+
   // TODO: Use this everywhere instead of tuples or databaseName, tableName,.
   /** A fully qualified identifier for a table (i.e., database.tableName) */
   case class QualifiedTableName(database: String, name: String) {
@@ -99,11 +100,22 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
 
   val caseSensitive: Boolean = false
 
+  /** *
+    * Creates a data source table (a table created with USING clause) in Hive's metastore.
+    * Returns true when the table has been created. Otherwise, false.
+    * @param tableName
+    * @param userSpecifiedSchema
+    * @param provider
+    * @param options
+    * @param isExternal
+    * @return
+    */
   def createDataSourceTable(
       tableName: String,
       userSpecifiedSchema: Option[StructType],
       provider: String,
-      options: Map[String, String]) = {
+      options: Map[String, String],
+      isExternal: Boolean): Unit = {
     val (dbName, tblName) = processDatabaseAndTableName("default", tableName)
     val tbl = new Table(dbName, tblName)
 
@@ -113,8 +125,13 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
     }
     options.foreach { case (key, value) => tbl.setSerdeParam(key, value) }
 
-    tbl.setProperty("EXTERNAL", "TRUE")
-    tbl.setTableType(TableType.EXTERNAL_TABLE)
+    if (isExternal) {
+      tbl.setProperty("EXTERNAL", "TRUE")
+      tbl.setTableType(TableType.EXTERNAL_TABLE)
+    } else {
+      tbl.setProperty("EXTERNAL", "FALSE")
+      tbl.setTableType(TableType.MANAGED_TABLE)
+    }
 
     // create the table
     synchronized {
@@ -122,6 +139,10 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
     }
   }
 
+  def hiveDefaultTableFilePath(tableName: String): String = {
+    hiveWarehouse.getTablePath(client.getDatabaseCurrent, tableName).toString
+  }
+
   def tableExists(tableIdentifier: Seq[String]): Boolean = {
     val tableIdent = processTableIdentifier(tableIdentifier)
     val databaseName = tableIdent.lift(tableIdent.size - 2).getOrElse(

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index fa99728..d891110 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -33,7 +33,7 @@ import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeComman
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.execution._
 import org.apache.spark.sql.parquet.ParquetRelation
-import org.apache.spark.sql.sources.CreateTableUsing
+import org.apache.spark.sql.sources.{CreateTableUsingAsLogicalPlan, CreateTableUsingAsSelect, CreateTableUsing}
 import org.apache.spark.sql.types.StringType
 
 
@@ -212,9 +212,21 @@ private[hive] trait HiveStrategies {
 
   object HiveDDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, options) =>
+      case CreateTableUsing(tableName, userSpecifiedSchema, provider, false, opts, allowExisting) =>
         ExecutedCommand(
-          CreateMetastoreDataSource(tableName, userSpecifiedSchema, provider, options)) :: Nil
+          CreateMetastoreDataSource(
+            tableName, userSpecifiedSchema, provider, opts, allowExisting)) :: Nil
+
+      case CreateTableUsingAsSelect(tableName, provider, false, opts, allowExisting, query) =>
+        val logicalPlan = hiveContext.parseSql(query)
+        val cmd =
+          CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, logicalPlan)
+        ExecutedCommand(cmd) :: Nil
+
+      case CreateTableUsingAsLogicalPlan(tableName, provider, false, opts, allowExisting, query) =>
+        val cmd =
+          CreateMetastoreDataSourceAsSelect(tableName, provider, opts, allowExisting, query)
+        ExecutedCommand(cmd) :: Nil
 
       case _ => Nil
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
index 4814cb7..95dcacc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/commands.scala
@@ -18,8 +18,10 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources.ResolvedDataSource
+import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.RunnableCommand
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.types.StructType
@@ -102,11 +104,77 @@ case class CreateMetastoreDataSource(
     tableName: String,
     userSpecifiedSchema: Option[StructType],
     provider: String,
-    options: Map[String, String]) extends RunnableCommand {
+    options: Map[String, String],
+    allowExisting: Boolean) extends RunnableCommand {
 
-  override def run(sqlContext: SQLContext) = {
+  override def run(sqlContext: SQLContext): Seq[Row] = {
     val hiveContext = sqlContext.asInstanceOf[HiveContext]
-    hiveContext.catalog.createDataSourceTable(tableName, userSpecifiedSchema, provider, options)
+
+    if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+      if (allowExisting) {
+        return Seq.empty[Row]
+      } else {
+        sys.error(s"Table $tableName already exists.")
+      }
+    }
+
+    var isExternal = true
+    val optionsWithPath =
+      if (!options.contains("path")) {
+        isExternal = false
+        options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+      } else {
+        options
+      }
+
+    hiveContext.catalog.createDataSourceTable(
+      tableName,
+      userSpecifiedSchema,
+      provider,
+      optionsWithPath,
+      isExternal)
+
+    Seq.empty[Row]
+  }
+}
+
+case class CreateMetastoreDataSourceAsSelect(
+    tableName: String,
+    provider: String,
+    options: Map[String, String],
+    allowExisting: Boolean,
+    query: LogicalPlan) extends RunnableCommand {
+
+  override def run(sqlContext: SQLContext): Seq[Row] = {
+    val hiveContext = sqlContext.asInstanceOf[HiveContext]
+
+    if (hiveContext.catalog.tableExists(tableName :: Nil)) {
+      if (allowExisting) {
+        return Seq.empty[Row]
+      } else {
+        sys.error(s"Table $tableName already exists.")
+      }
+    }
+
+    val df = DataFrame(hiveContext, query)
+    var isExternal = true
+    val optionsWithPath =
+      if (!options.contains("path")) {
+        isExternal = false
+        options + ("path" -> hiveContext.catalog.hiveDefaultTableFilePath(tableName))
+      } else {
+        options
+      }
+
+    // Create the relation based on the data of df.
+    ResolvedDataSource(sqlContext, provider, optionsWithPath, df)
+
+    hiveContext.catalog.createDataSourceTable(
+      tableName,
+      None,
+      provider,
+      optionsWithPath,
+      isExternal)
 
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7408c7f..85795ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -22,7 +22,9 @@ import java.io.File
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.commons.io.FileUtils
+import org.apache.hadoop.fs.Path
 
+import org.apache.spark.sql.catalyst.util
 import org.apache.spark.sql._
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.types._
@@ -36,9 +38,11 @@ import org.apache.spark.sql.hive.test.TestHive._
 class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
   override def afterEach(): Unit = {
     reset()
+    if (ctasPath.exists()) Utils.deleteRecursively(ctasPath)
   }
 
   val filePath = Utils.getSparkClassLoader.getResource("sample.json").getFile
+  var ctasPath: File = util.getTempFilePath("jsonCTAS").getCanonicalFile
 
   test ("persistent JSON table") {
     sql(
@@ -94,7 +98,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       StructField("<d>", innerStruct, true) ::
       StructField("b", StringType, true) :: Nil)
 
-    assert(expectedSchema == table("jsonTable").schema)
+    assert(expectedSchema === table("jsonTable").schema)
 
     jsonFile(filePath).registerTempTable("expectedJsonTable")
 
@@ -137,6 +141,11 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     intercept[Exception] {
       sql("SELECT * FROM jsonTable").collect()
     }
+
+    assert(
+      (new File(filePath)).exists(),
+      "The table with specified path is considered as an external table, " +
+        "its data should not deleted after DROP TABLE.")
   }
 
   test("check change without refresh") {
@@ -240,7 +249,144 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
     invalidateTable("jsonTable")
     val expectedSchema = StructType(StructField("c_!@(3)", IntegerType, true) :: Nil)
 
-    assert(expectedSchema == table("jsonTable").schema)
+    assert(expectedSchema === table("jsonTable").schema)
+  }
+
+  test("CTAS") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    sql(
+      s"""
+        |CREATE TABLE ctasJsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${ctasPath}'
+        |) AS
+        |SELECT * FROM jsonTable
+      """.stripMargin)
+
+    assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+
+    checkAnswer(
+      sql("SELECT * FROM ctasJsonTable"),
+      sql("SELECT * FROM jsonTable").collect())
+  }
+
+  test("CTAS with IF NOT EXISTS") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    sql(
+      s"""
+        |CREATE TABLE ctasJsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${ctasPath}'
+        |) AS
+        |SELECT * FROM jsonTable
+      """.stripMargin)
+
+    // Create the table again should trigger a AlreadyExistsException.
+    val message = intercept[RuntimeException] {
+      sql(
+        s"""
+        |CREATE TABLE ctasJsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${ctasPath}'
+        |) AS
+        |SELECT * FROM jsonTable
+      """.stripMargin)
+    }.getMessage
+    assert(message.contains("Table ctasJsonTable already exists."),
+      "We should complain that ctasJsonTable already exists")
+
+    // The following statement should be fine if it has IF NOT EXISTS.
+    // It tries to create a table ctasJsonTable with a new schema.
+    // The actual table's schema and data should not be changed.
+    sql(
+      s"""
+        |CREATE TABLE IF NOT EXISTS ctasJsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${ctasPath}'
+        |) AS
+        |SELECT a FROM jsonTable
+      """.stripMargin)
+
+    // Discard the cached relation.
+    invalidateTable("ctasJsonTable")
+
+    // Schema should not be changed.
+    assert(table("ctasJsonTable").schema === table("jsonTable").schema)
+    // Table data should not be changed.
+    checkAnswer(
+      sql("SELECT * FROM ctasJsonTable"),
+      sql("SELECT * FROM jsonTable").collect())
+  }
+
+  test("CTAS a managed table") {
+    sql(
+      s"""
+        |CREATE TABLE jsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${filePath}'
+        |)
+      """.stripMargin)
+
+    new Path("/Users/yhuai/Desktop/whatever")
+
+
+    val expectedPath = catalog.hiveDefaultTableFilePath("ctasJsonTable")
+    val filesystemPath = new Path(expectedPath)
+    val fs = filesystemPath.getFileSystem(sparkContext.hadoopConfiguration)
+    if (fs.exists(filesystemPath)) fs.delete(filesystemPath, true)
+
+    // It is a managed table when we do not specify the location.
+    sql(
+      s"""
+        |CREATE TABLE ctasJsonTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |
+        |) AS
+        |SELECT * FROM jsonTable
+      """.stripMargin)
+
+    assert(fs.exists(filesystemPath), s"$expectedPath should exist after we create the table.")
+
+    sql(
+      s"""
+        |CREATE TABLE loadedTable
+        |USING org.apache.spark.sql.json.DefaultSource
+        |OPTIONS (
+        |  path '${expectedPath}'
+        |)
+      """.stripMargin)
+
+    assert(table("ctasJsonTable").schema === table("loadedTable").schema)
+
+    checkAnswer(
+      sql("SELECT * FROM ctasJsonTable"),
+      sql("SELECT * FROM loadedTable").collect()
+    )
+
+    sql("DROP TABLE ctasJsonTable")
+    assert(!fs.exists(filesystemPath), s"$expectedPath should not exist after we drop the table.")
   }
 
   test("SPARK-5286 Fail to drop an invalid table when using the data source API") {
@@ -255,4 +401,39 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
 
     sql("DROP TABLE jsonTable").collect().foreach(println)
   }
+
+  test("save and load table") {
+    val originalDefaultSource = conf.defaultDataSourceName
+    conf.setConf("spark.sql.default.datasource", "org.apache.spark.sql.json")
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}"""))
+    val df = jsonRDD(rdd)
+
+    df.saveAsTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable"),
+      df.collect())
+
+    createTable("createdJsonTable", catalog.hiveDefaultTableFilePath("savedJsonTable"), false)
+    assert(table("createdJsonTable").schema === df.schema)
+    checkAnswer(
+      sql("SELECT * FROM createdJsonTable"),
+      df.collect())
+
+    val message = intercept[RuntimeException] {
+      createTable("createdJsonTable", filePath.toString, false)
+    }.getMessage
+    assert(message.contains("Table createdJsonTable already exists."),
+      "We should complain that ctasJsonTable already exists")
+
+    createTable("createdJsonTable", filePath.toString, true)
+    // createdJsonTable should be not changed.
+    assert(table("createdJsonTable").schema === df.schema)
+    checkAnswer(
+      sql("SELECT * FROM createdJsonTable"),
+      df.collect())
+
+    conf.setConf("spark.sql.default.datasource", originalDefaultSource)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/13531dd9/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index eb7a775..4efe0c5 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -170,7 +170,7 @@ class SQLQuerySuite extends QueryTest {
     sql("CREATE TABLE test2 (key INT, value STRING)")
     testData.insertInto("test2")
     testData.insertInto("test2")
-    sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").saveAsTable("test")
+    sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
     checkAnswer(
       table("test"),
       sql("SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key").collect().toSeq)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org