You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/04/16 05:40:48 UTC

git commit: [SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs

Repository: spark
Updated Branches:
  refs/heads/master 63ca581d9 -> 273c2fd08


[SQL] SPARK-1424 Generalize insertIntoTable functions on SchemaRDDs

This makes it possible to create tables and insert into them using the DSL and SQL for the scala and java apis.

Author: Michael Armbrust <mi...@databricks.com>

Closes #354 from marmbrus/insertIntoTable and squashes the following commits:

6c6f227 [Michael Armbrust] Create random temporary files in python parquet unit tests.
f5e6d5c [Michael Armbrust] Merge remote-tracking branch 'origin/master' into insertIntoTable
765c506 [Michael Armbrust] Add to JavaAPI.
77b512c [Michael Armbrust] typos.
5c3ef95 [Michael Armbrust] use names for boolean args.
882afdf [Michael Armbrust] Change createTableAs to saveAsTable.  Clean up api annotations.
d07d94b [Michael Armbrust] Add tests, support for creating parquet files and hive tables.
fa3fe81 [Michael Armbrust] Make insertInto available on JavaSchemaRDD as well.  Add createTableAs function.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/273c2fd0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/273c2fd0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/273c2fd0

Branch: refs/heads/master
Commit: 273c2fd08deb49e970ec471c857dcf0b2953f922
Parents: 63ca581
Author: Michael Armbrust <mi...@databricks.com>
Authored: Tue Apr 15 20:40:40 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Tue Apr 15 20:40:40 2014 -0700

----------------------------------------------------------------------
 python/pyspark/sql.py                           |  14 +-
 .../scala/org/apache/spark/sql/SQLContext.scala |  57 ++++++-
 .../scala/org/apache/spark/sql/SchemaRDD.scala  |  28 +---
 .../org/apache/spark/sql/SchemaRDDLike.scala    |  59 +++++++-
 .../spark/sql/api/java/JavaSQLContext.scala     |  78 +++++++---
 .../spark/sql/parquet/ParquetRelation.scala     |  11 +-
 .../org/apache/spark/sql/InsertIntoSuite.scala  | 148 +++++++++++++++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |  11 +-
 .../scala/org/apache/spark/sql/TestData.scala   |   3 +-
 .../spark/sql/execution/PlannerSuite.scala      |   8 +-
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  41 +----
 .../org/apache/spark/sql/hive/HiveContext.scala |  18 ++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  13 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |  77 ++++++++++
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  77 ++++++++++
 .../spark/sql/parquet/HiveParquetSuite.scala    |  52 -------
 16 files changed, 535 insertions(+), 160 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/python/pyspark/sql.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 67e6eee..27753d5 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -106,9 +106,12 @@ class SQLContext:
         """
         Loads a Parquet file, returning the result as a L{SchemaRDD}.
 
+        >>> import tempfile, shutil
+        >>> parquetFile = tempfile.mkdtemp()
+        >>> shutil.rmtree(parquetFile)
         >>> srdd = sqlCtx.inferSchema(rdd)
-        >>> srdd.saveAsParquetFile("/tmp/tmp.parquet")
-        >>> srdd2 = sqlCtx.parquetFile("/tmp/tmp.parquet")
+        >>> srdd.saveAsParquetFile(parquetFile)
+        >>> srdd2 = sqlCtx.parquetFile(parquetFile)
         >>> srdd.collect() == srdd2.collect()
         True
         """
@@ -278,9 +281,12 @@ class SchemaRDD(RDD):
         that are written out using this method can be read back in as a SchemaRDD using the
         L{SQLContext.parquetFile} method.
 
+        >>> import tempfile, shutil
+        >>> parquetFile = tempfile.mkdtemp()
+        >>> shutil.rmtree(parquetFile)
         >>> srdd = sqlCtx.inferSchema(rdd)
-        >>> srdd.saveAsParquetFile("/tmp/test.parquet")
-        >>> srdd2 = sqlCtx.parquetFile("/tmp/test.parquet")
+        >>> srdd.saveAsParquetFile(parquetFile)
+        >>> srdd2 = sqlCtx.parquetFile(parquetFile)
         >>> srdd2.collect() == srdd.collect()
         True
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
index 24d60ea..4d216b5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala
@@ -20,18 +20,26 @@ package org.apache.spark.sql
 import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 
+import org.apache.hadoop.conf.Configuration
+
 import org.apache.spark.SparkContext
-import org.apache.spark.annotation.{AlphaComponent, Experimental}
+import org.apache.spark.annotation.{AlphaComponent, DeveloperApi, Experimental}
 import org.apache.spark.rdd.RDD
+
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.dsl
+import org.apache.spark.sql.catalyst.{ScalaReflection, dsl}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan}
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
+
 import org.apache.spark.sql.columnar.InMemoryColumnarTableScan
+
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.SparkStrategies
+
+import org.apache.spark.sql.parquet.ParquetRelation
 
 /**
  * :: AlphaComponent ::
@@ -65,12 +73,12 @@ class SQLContext(@transient val sparkContext: SparkContext)
     new this.QueryExecution { val logical = plan }
 
   /**
-   * :: Experimental ::
+   * :: DeveloperApi ::
    * Allows catalyst LogicalPlans to be executed as a SchemaRDD.  Note that the LogicalPlan
-   * interface is considered internal, and thus not guranteed to be stable.  As a result, using
-   * them directly is not reccomended.
+   * interface is considered internal, and thus not guaranteed to be stable.  As a result, using
+   * them directly is not recommended.
    */
-  @Experimental
+  @DeveloperApi
   implicit def logicalPlanToSparkQuery(plan: LogicalPlan): SchemaRDD = new SchemaRDD(this, plan)
 
   /**
@@ -89,6 +97,39 @@ class SQLContext(@transient val sparkContext: SparkContext)
   def parquetFile(path: String): SchemaRDD =
     new SchemaRDD(this, parquet.ParquetRelation(path))
 
+  /**
+   * :: Experimental ::
+   * Creates an empty parquet file with the schema of class `A`, which can be registered as a table.
+   * This registered table can be used as the target of future `insertInto` operations.
+   *
+   * {{{
+   *   val sqlContext = new SQLContext(...)
+   *   import sqlContext._
+   *
+   *   case class Person(name: String, age: Int)
+   *   createParquetFile[Person]("path/to/file.parquet").registerAsTable("people")
+   *   sql("INSERT INTO people SELECT 'michael', 29")
+   * }}}
+   *
+   * @tparam A A case class type that describes the desired schema of the parquet file to be
+   *           created.
+   * @param path The path where the directory containing parquet metadata should be created.
+   *             Data inserted into this table will also be stored at this location.
+   * @param allowExisting When false, an exception will be thrown if this directory already exists.
+   * @param conf A Hadoop configuration object that can be used to specify options to the parquet
+   *             output format.
+   *
+   * @group userf
+   */
+  @Experimental
+  def createParquetFile[A <: Product : TypeTag](
+      path: String,
+      allowExisting: Boolean = true,
+      conf: Configuration = new Configuration()): SchemaRDD = {
+    new SchemaRDD(
+      this,
+      ParquetRelation.createEmpty(path, ScalaReflection.attributesFor[A], allowExisting, conf))
+  }
 
   /**
    * Registers the given RDD as a temporary table in the catalog.  Temporary tables exist only
@@ -208,9 +249,11 @@ class SQLContext(@transient val sparkContext: SparkContext)
   }
 
   /**
+   * :: DeveloperApi ::
    * The primary workflow for executing relational queries using Spark.  Designed to allow easy
    * access to the intermediate phases of query execution for developers.
    */
+  @DeveloperApi
   protected abstract class QueryExecution {
     def logical: LogicalPlan
 
@@ -231,7 +274,7 @@ class SQLContext(@transient val sparkContext: SparkContext)
     override def toString: String =
       s"""== Logical Plan ==
          |${stringOrError(analyzed)}
-         |== Optimized Logical Plan
+         |== Optimized Logical Plan ==
          |${stringOrError(optimizedPlan)}
          |== Physical Plan ==
          |${stringOrError(executedPlan)}

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index a771147..f2ae5b0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql
 import net.razorvine.pickle.Pickler
 
 import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
-import org.apache.spark.annotation.{AlphaComponent, Experimental}
+import org.apache.spark.annotation.{AlphaComponent, Experimental, DeveloperApi}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.expressions._
@@ -83,8 +83,6 @@ import java.util.{Map => JMap}
  *  rdd.where('key === 1).orderBy('value.asc).select('key).collect()
  * }}}
  *
- *  @todo There is currently no support for creating SchemaRDDs from either Java or Python RDDs.
- *
  *  @groupname Query Language Integrated Queries
  *  @groupdesc Query Functions that create new queries from SchemaRDDs.  The
  *             result of all query functions is also a SchemaRDD, allowing multiple operations to be
@@ -276,8 +274,8 @@ class SchemaRDD(
    *              an `OUTER JOIN` in SQL.  When no output rows are produced by the generator for a
    *              given row, a single row will be output, with `NULL` values for each of the
    *              generated columns.
-   * @param alias an optional alias that can be used as qualif for the attributes that are produced
-   *              by this generate operation.
+   * @param alias an optional alias that can be used as qualifier for the attributes that are
+   *              produced by this generate operation.
    *
    * @group Query
    */
@@ -290,29 +288,13 @@ class SchemaRDD(
     new SchemaRDD(sqlContext, Generate(generator, join, outer, None, logicalPlan))
 
   /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table.  Note in a standard [[SQLContext]] there is
-   * no notion of persistent tables, and thus queries that contain this operator will fail to
-   * optimize.  When working with an extension of a SQLContext that has a persistent catalog, such
-   * as a `HiveContext`, this operation will result in insertions to the table specified.
+   * Returns this RDD as a SchemaRDD.  Intended primarily to force the invocation of the implicit
+   * conversion from a standard RDD to a SchemaRDD.
    *
    * @group schema
    */
-  @Experimental
-  def insertInto(tableName: String, overwrite: Boolean = false) =
-    new SchemaRDD(
-      sqlContext,
-      InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite))
-
-  /**
-   * Returns this RDD as a SchemaRDD.
-   * @group schema
-   */
   def toSchemaRDD = this
 
-  /** FOR INTERNAL USE ONLY */
-  def analyze = sqlContext.analyzer(logicalPlan)
-
   private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
     val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
     this.mapPartitions { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 3dd9897..a390ab6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.plans.logical._
 
 /**
@@ -29,14 +31,24 @@ trait SchemaRDDLike {
   private[sql] def baseSchemaRDD: SchemaRDD
 
   /**
+   * :: DeveloperApi ::
    * A lazily computed query execution workflow.  All other RDD operations are passed
-   * through to the RDD that is produced by this workflow.
+   * through to the RDD that is produced by this workflow. This workflow is produced lazily because
+   * invoking the whole query optimization pipeline can be expensive.
    *
-   * We want this to be lazy because invoking the whole query optimization pipeline can be
-   * expensive.
+   * The query execution is considered a Developer API as phases may be added or removed in future
+   * releases.  This execution is only exposed to provide an interface for inspecting the various
+   * phases for debugging purposes.  Applications should not depend on particular phases existing
+   * or producing any specific output, even for exactly the same query.
+   *
+   * Additionally, the RDD exposed by this execution is not designed for consumption by end users.
+   * In particular, it does not contain any schema information, and it reuses Row objects
+   * internally.  This object reuse improves performance, but can make programming against the RDD
+   * more difficult.  Instead end users should perform RDD operations on a SchemaRDD directly.
    */
   @transient
-  protected[spark] lazy val queryExecution = sqlContext.executePlan(logicalPlan)
+  @DeveloperApi
+  lazy val queryExecution = sqlContext.executePlan(logicalPlan)
 
   override def toString =
     s"""${super.toString}
@@ -45,7 +57,8 @@ trait SchemaRDDLike {
 
   /**
    * Saves the contents of this `SchemaRDD` as a parquet file, preserving the schema.  Files that
-   * are written out using this method can be read back in as a SchemaRDD using the ``function
+   * are written out using this method can be read back in as a SchemaRDD using the `parquetFile`
+   * function.
    *
    * @group schema
    */
@@ -62,4 +75,40 @@ trait SchemaRDDLike {
   def registerAsTable(tableName: String): Unit = {
     sqlContext.registerRDDAsTable(baseSchemaRDD, tableName)
   }
+
+  /**
+   * :: Experimental ::
+   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+   *
+   * @group schema
+   */
+  @Experimental
+  def insertInto(tableName: String, overwrite: Boolean): Unit =
+    sqlContext.executePlan(
+      InsertIntoTable(UnresolvedRelation(None, tableName), Map.empty, logicalPlan, overwrite)).toRdd
+
+  /**
+   * :: Experimental ::
+   * Appends the rows from this RDD to the specified table.
+   *
+   * @group schema
+   */
+  @Experimental
+  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
+
+  /**
+   * :: Experimental ::
+   * Creates a table from the the contents of this SchemaRDD.  This will fail if the table already
+   * exists.
+   *
+   * Note that this currently only works with SchemaRDDs that are created from a HiveContext as
+   * there is no notion of a persisted catalog in a standard SQL context.  Instead you can write
+   * an RDD out to a parquet file, and then register that file as a table.  This "table" can then
+   * be the target of an `insertInto`.
+   *
+   * @group schema
+   */
+  @Experimental
+  def saveAsTable(tableName: String): Unit =
+    sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
index 573345e..26922f7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.api.java
 
-import java.beans.{Introspector, PropertyDescriptor}
+import java.beans.Introspector
 
+import org.apache.hadoop.conf.Configuration
+
+import org.apache.spark.annotation.Experimental
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, GenericRow, Row => ScalaRow}
@@ -46,28 +49,41 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
   }
 
   /**
+   * :: Experimental ::
+   * Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
+   * a table. This registered table can be used as the target of future insertInto` operations.
+   *
+   * {{{
+   *   JavaSQLContext sqlCtx = new JavaSQLContext(...)
+   *
+   *   sqlCtx.createParquetFile(Person.class, "path/to/file.parquet").registerAsTable("people")
+   *   sqlCtx.sql("INSERT INTO people SELECT 'michael', 29")
+   * }}}
+   *
+   * @param beanClass A java bean class object that will be used to determine the schema of the
+   *                  parquet file.                          s
+   * @param path The path where the directory containing parquet metadata should be created.
+   *             Data inserted into this table will also be stored at this location.
+   * @param allowExisting When false, an exception will be thrown if this directory already exists.
+   * @param conf A Hadoop configuration object that can be used to specific options to the parquet
+   *             output format.
+   */
+  @Experimental
+  def createParquetFile(
+      beanClass: Class[_],
+      path: String,
+      allowExisting: Boolean = true,
+      conf: Configuration = new Configuration()): JavaSchemaRDD = {
+    new JavaSchemaRDD(
+      sqlContext,
+      ParquetRelation.createEmpty(path, getSchema(beanClass), allowExisting, conf))
+  }
+
+  /**
    * Applies a schema to an RDD of Java Beans.
    */
   def applySchema(rdd: JavaRDD[_], beanClass: Class[_]): JavaSchemaRDD = {
-    // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
-    val beanInfo = Introspector.getBeanInfo(beanClass)
-
-    val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
-    val schema = fields.map { property =>
-      val dataType = property.getPropertyType match {
-        case c: Class[_] if c == classOf[java.lang.String] => StringType
-        case c: Class[_] if c == java.lang.Short.TYPE => ShortType
-        case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
-        case c: Class[_] if c == java.lang.Long.TYPE => LongType
-        case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
-        case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
-        case c: Class[_] if c == java.lang.Float.TYPE => FloatType
-        case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
-      }
-
-      AttributeReference(property.getName, dataType, true)()
-    }
-
+    val schema = getSchema(beanClass)
     val className = beanClass.getCanonicalName
     val rowRdd = rdd.rdd.mapPartitions { iter =>
       // BeanInfo is not serializable so we must rediscover it remotely for each partition.
@@ -97,4 +113,26 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
   def registerRDDAsTable(rdd: JavaSchemaRDD, tableName: String): Unit = {
     sqlContext.registerRDDAsTable(rdd.baseSchemaRDD, tableName)
   }
+
+  /** Returns a Catalyst Schema for the given java bean class. */
+  protected def getSchema(beanClass: Class[_]): Seq[AttributeReference] = {
+    // TODO: All of this could probably be moved to Catalyst as it is mostly not Spark specific.
+    val beanInfo = Introspector.getBeanInfo(beanClass)
+
+    val fields = beanInfo.getPropertyDescriptors.filterNot(_.getName == "class")
+    fields.map { property =>
+      val dataType = property.getPropertyType match {
+        case c: Class[_] if c == classOf[java.lang.String] => StringType
+        case c: Class[_] if c == java.lang.Short.TYPE => ShortType
+        case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
+        case c: Class[_] if c == java.lang.Long.TYPE => LongType
+        case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
+        case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
+        case c: Class[_] if c == java.lang.Float.TYPE => FloatType
+        case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
+      }
+      // TODO: Nullability could be stricter.
+      AttributeReference(property.getName, dataType, nullable = true)()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 4d7c86a..32813a6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -119,7 +119,7 @@ private[sql] object ParquetRelation {
         child,
         "Attempt to create Parquet table from unresolved child (when schema is not available)")
     }
-    createEmpty(pathString, child.output, conf)
+    createEmpty(pathString, child.output, false, conf)
   }
 
   /**
@@ -133,8 +133,9 @@ private[sql] object ParquetRelation {
    */
   def createEmpty(pathString: String,
                   attributes: Seq[Attribute],
+                  allowExisting: Boolean,
                   conf: Configuration): ParquetRelation = {
-    val path = checkPath(pathString, conf)
+    val path = checkPath(pathString, allowExisting, conf)
     if (conf.get(ParquetOutputFormat.COMPRESSION) == null) {
       conf.set(ParquetOutputFormat.COMPRESSION, ParquetRelation.defaultCompression.name())
     }
@@ -143,7 +144,7 @@ private[sql] object ParquetRelation {
     new ParquetRelation(path.toString)
   }
 
-  private def checkPath(pathStr: String, conf: Configuration): Path = {
+  private def checkPath(pathStr: String, allowExisting: Boolean, conf: Configuration): Path = {
     if (pathStr == null) {
       throw new IllegalArgumentException("Unable to create ParquetRelation: path is null")
     }
@@ -154,6 +155,10 @@ private[sql] object ParquetRelation {
         s"Unable to create ParquetRelation: incorrectly formatted path $pathStr")
     }
     val path = origPath.makeQualified(fs)
+    if (!allowExisting && fs.exists(path)) {
+      sys.error(s"File $pathStr already exists.")
+    }
+
     if (fs.exists(path) &&
         !fs.getFileStatus(path)
         .getPermission

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
new file mode 100644
index 0000000..73d8796
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/InsertIntoSuite.scala
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+/* Implicits */
+import org.apache.spark.sql.test.TestSQLContext._
+
+class InsertIntoSuite extends QueryTest {
+  TestData // Initialize TestData
+  import TestData._
+
+  test("insertInto() created parquet file") {
+    val testFilePath = File.createTempFile("sparkSql", "pqt")
+    testFilePath.delete()
+    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+    testFile.registerAsTable("createAndInsertTest")
+
+    // Add some data.
+    testData.insertInto("createAndInsertTest")
+
+    // Make sure its there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq
+    )
+
+    // Add more data.
+    testData.insertInto("createAndInsertTest")
+
+    // Make sure all data is there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq ++ testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq ++ testData.collect().toSeq
+    )
+
+    // Now overwrite.
+    testData.insertInto("createAndInsertTest", overwrite = true)
+
+    // Make sure its there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq
+    )
+  }
+
+  test("INSERT INTO parquet table") {
+    val testFilePath = File.createTempFile("sparkSql", "pqt")
+    testFilePath.delete()
+    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+    testFile.registerAsTable("createAndInsertSQLTest")
+
+    sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
+
+    // Make sure its there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertSQLTest"),
+      testData.collect().toSeq
+    )
+
+    // Append more data.
+    sql("INSERT INTO createAndInsertSQLTest SELECT * FROM testData")
+
+    // Make sure all data is there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq ++ testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertSQLTest"),
+      testData.collect().toSeq ++ testData.collect().toSeq
+    )
+
+    sql("INSERT OVERWRITE INTO createAndInsertSQLTest SELECT * FROM testData")
+
+    // Make sure its there for a new instance of parquet file.
+    checkAnswer(
+      parquetFile(testFilePath.getCanonicalPath),
+      testData.collect().toSeq
+    )
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertSQLTest"),
+      testData.collect().toSeq
+    )
+  }
+
+  test("Double create fails when allowExisting = false") {
+    val testFilePath = File.createTempFile("sparkSql", "pqt")
+    testFilePath.delete()
+    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+
+    intercept[RuntimeException] {
+      createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = false)
+    }
+  }
+
+  test("Double create does not fail when allowExisting = true") {
+    val testFilePath = File.createTempFile("sparkSql", "pqt")
+    testFilePath.delete()
+    val testFile = createParquetFile[TestData](testFilePath.getCanonicalPath)
+
+    createParquetFile[TestData](testFilePath.getCanonicalPath, allowExisting = true)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index d719ceb..d6072b4 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -49,18 +49,21 @@ class QueryTest extends FunSuite {
             |$e
           """.stripMargin)
     }
+
     if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
       fail(s"""
         |Results do not match for query:
         |${rdd.logicalPlan}
         |== Analyzed Plan ==
         |${rdd.queryExecution.analyzed}
-        |== RDD ==
-        |$rdd
+        |== Physical Plan ==
+        |${rdd.queryExecution.executedPlan}
         |== Results ==
         |${sideBySide(
-            prepareAnswer(convertedAnswer).map(_.toString),
-            prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+            s"== Correct Answer - ${convertedAnswer.size} ==" +:
+              prepareAnswer(convertedAnswer).map(_.toString),
+            s"== Spark Answer - ${sparkAnswer.size} ==" +:
+              prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
       """.stripMargin)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 0bb13cf..271b1d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -23,8 +23,9 @@ import org.apache.spark.sql.test._
 /* Implicits */
 import TestSQLContext._
 
+case class TestData(key: Int, value: String)
+
 object TestData {
-  case class TestData(key: Int, value: String)
   val testData: SchemaRDD = TestSQLContext.sparkContext.parallelize(
     (1 to 100).map(i => TestData(i, i.toString)))
   testData.registerAsTable("testData")

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
index 658ff09..e24c74a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala
@@ -38,7 +38,7 @@ class PlannerSuite extends FunSuite {
   }
 
   test("count is partially aggregated") {
-    val query = testData.groupBy('value)(Count('key)).analyze.logicalPlan
+    val query = testData.groupBy('value)(Count('key)).queryExecution.analyzed
     val planned = PartialAggregation(query).head
     val aggregations = planned.collect { case a: Aggregate => a }
 
@@ -46,14 +46,14 @@ class PlannerSuite extends FunSuite {
   }
 
   test("count distinct is not partially aggregated") {
-    val query = testData.groupBy('value)(CountDistinct('key :: Nil)).analyze.logicalPlan
-    val planned = PartialAggregation(query.logicalPlan)
+    val query = testData.groupBy('value)(CountDistinct('key :: Nil)).queryExecution.analyzed
+    val planned = PartialAggregation(query)
     assert(planned.isEmpty)
   }
 
   test("mixed aggregates are not partially aggregated") {
     val query =
-      testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).analyze.logicalPlan
+      testData.groupBy('value)(Count('value), CountDistinct('key :: Nil)).queryExecution.analyzed
     val planned = PartialAggregation(query)
     assert(planned.isEmpty)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index fc68d6c..d9c9b9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.parquet
 
+import java.io.File
+
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
 
 import org.apache.hadoop.fs.{Path, FileSystem}
@@ -26,21 +28,23 @@ import parquet.hadoop.ParquetFileWriter
 import parquet.schema.MessageTypeParser
 import parquet.hadoop.util.ContextUtil
 
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.util.getTempFilePath
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Row}
 import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.TestData
 import org.apache.spark.util.Utils
 import org.apache.spark.sql.catalyst.types.{StringType, IntegerType, DataType}
 import org.apache.spark.sql.{parquet, SchemaRDD}
-import org.apache.spark.sql.catalyst.expressions.AttributeReference
-import scala.Tuple2
 
 // Implicits
 import org.apache.spark.sql.test.TestSQLContext._
 
 case class TestRDDEntry(key: Int, value: String)
 
-class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
+class ParquetQuerySuite extends QueryTest with FunSuite with BeforeAndAfterAll {
+  import TestData._
+  TestData // Load test data tables.
 
   var testRDD: SchemaRDD = null
 
@@ -178,23 +182,6 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
     assert(true)
   }
 
-  test("insert (overwrite) via Scala API (new SchemaRDD)") {
-    val dirname = Utils.createTempDir()
-    val source_rdd = TestSQLContext.sparkContext.parallelize((1 to 100))
-      .map(i => TestRDDEntry(i, s"val_$i"))
-    source_rdd.registerAsTable("source")
-    val dest_rdd = createParquetFile(dirname.toString, ("key", IntegerType), ("value", StringType))
-    dest_rdd.registerAsTable("dest")
-    sql("INSERT OVERWRITE INTO dest SELECT * FROM source").collect()
-    val rdd_copy1 = sql("SELECT * FROM dest").collect()
-    assert(rdd_copy1.size === 100)
-    assert(rdd_copy1(0).apply(0) === 1)
-    assert(rdd_copy1(0).apply(1) === "val_1")
-    sql("INSERT INTO dest SELECT * FROM source").collect()
-    val rdd_copy2 = sql("SELECT * FROM dest").collect()
-    assert(rdd_copy2.size === 200)
-    Utils.deleteRecursively(dirname)
-  }
 
   test("insert (appending) to same table via Scala API") {
     sql("INSERT INTO testsource SELECT * FROM testsource").collect()
@@ -208,19 +195,5 @@ class ParquetQuerySuite extends FunSuite with BeforeAndAfterAll {
     Utils.deleteRecursively(ParquetTestData.testDir)
     ParquetTestData.writeFile()
   }
-
-  /**
-   * Creates an empty SchemaRDD backed by a ParquetRelation.
-   *
-   * TODO: since this is so experimental it is better to have it here and not
-   * in SQLContext. Also note that when creating new AttributeReferences
-   * one needs to take care not to create duplicate Attribute ID's.
-   */
-  private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
-    val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
-    new SchemaRDD(
-      TestSQLContext,
-      parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index 3534584..c0d8adf 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -23,17 +23,21 @@ import scala.language.implicitConversions
 import java.io.{BufferedReader, File, InputStreamReader, PrintStream}
 import java.util.{ArrayList => JArrayList}
 
+import scala.reflect.runtime.universe.TypeTag
+
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.session.SessionState
 
+import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions.GenericRow
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LowerCaseSchema}
 import org.apache.spark.sql.catalyst.plans.logical.{NativeCommand, ExplainCommand}
+import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.catalyst.types._
 import org.apache.spark.sql.execution._
 
@@ -77,7 +81,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
     // We force query optimization to happen right away instead of letting it happen lazily like
     // when using the query DSL.  This is so DDL commands behave as expected.  This is only
-    // generates the RDD lineage for DML queries, but do not perform any execution.
+    // generates the RDD lineage for DML queries, but does not perform any execution.
     result.queryExecution.toRdd
     result
   }
@@ -85,6 +89,17 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
   /** An alias for `hiveql`. */
   def hql(hqlQuery: String): SchemaRDD = hiveql(hqlQuery)
 
+  /**
+   * Creates a table using the schema of the given class.
+   *
+   * @param tableName The name of the table to create.
+   * @param allowExisting When false, an exception will be thrown if the table already exists.
+   * @tparam A A case class that is used to describe the schema of the table to be created.
+   */
+  def createTable[A <: Product : TypeTag](tableName: String, allowExisting: Boolean = true) {
+    catalog.createTable("default", tableName, ScalaReflection.attributesFor[A], allowExisting)
+  }
+
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
   @transient
   protected val outputBuffer =  new java.io.OutputStream {
@@ -224,6 +239,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
     sparkContext.parallelize(Seq(new GenericRow(Array[Any]()): Row), 1)
 
   /** Extends QueryExecution with hive specific features. */
+  @DeveloperApi
   protected[sql] abstract class QueryExecution extends super.QueryExecution {
     // TODO: Create mixin for the analyzer instead of overriding things here.
     override lazy val optimizedPlan =

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index c36b587..ca75cec 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -64,7 +64,11 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
       alias)(table.getTTable, partitions.map(part => part.getTPartition))
   }
 
-  def createTable(databaseName: String, tableName: String, schema: Seq[Attribute]) {
+  def createTable(
+      databaseName: String,
+      tableName: String,
+      schema: Seq[Attribute],
+      allowExisting: Boolean = false): Unit = {
     val table = new Table(databaseName, tableName)
     val hiveSchema =
       schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
@@ -84,7 +88,12 @@ class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with Logging {
     serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
     serDeInfo.setParameters(Map[String, String]())
     sd.setSerdeInfo(serDeInfo)
-    client.createTable(table)
+
+    try client.createTable(table) catch {
+      case e: org.apache.hadoop.hive.ql.metadata.HiveException
+        if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
+           allowExisting => // Do nothing.
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
new file mode 100644
index 0000000..11d8b1f
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.util._
+
+
+/**
+ * *** DUPLICATED FROM sql/core. ***
+ *
+ * It is hard to have maven allow one subproject depend on another subprojects test code.
+ * So, we duplicate this code here.
+ */
+class QueryTest extends FunSuite {
+  /**
+   * Runs the plan and makes sure the answer matches the expected result.
+   * @param rdd the [[SchemaRDD]] to be executed
+   * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
+   */
+  protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {
+    val convertedAnswer = expectedAnswer match {
+      case s: Seq[_] if s.isEmpty => s
+      case s: Seq[_] if s.head.isInstanceOf[Product] &&
+        !s.head.isInstanceOf[Seq[_]] => s.map(_.asInstanceOf[Product].productIterator.toIndexedSeq)
+      case s: Seq[_] => s
+      case singleItem => Seq(Seq(singleItem))
+    }
+
+    val isSorted = rdd.logicalPlan.collect { case s: logical.Sort => s}.nonEmpty
+    def prepareAnswer(answer: Seq[Any]) = if (!isSorted) answer.sortBy(_.toString) else answer
+    val sparkAnswer = try rdd.collect().toSeq catch {
+      case e: Exception =>
+        fail(
+          s"""
+            |Exception thrown while executing query:
+            |${rdd.logicalPlan}
+            |== Exception ==
+            |$e
+          """.stripMargin)
+    }
+
+    if(prepareAnswer(convertedAnswer) != prepareAnswer(sparkAnswer)) {
+      fail(s"""
+        |Results do not match for query:
+        |${rdd.logicalPlan}
+        |== Analyzed Plan ==
+        |${rdd.queryExecution.analyzed}
+        |== Physical Plan ==
+        |${rdd.queryExecution.executedPlan}
+        |== Results ==
+        |${sideBySide(
+            s"== Correct Answer - ${convertedAnswer.size} ==" +:
+              prepareAnswer(convertedAnswer).map(_.toString),
+            s"== Spark Answer - ${sparkAnswer.size} ==" +:
+              prepareAnswer(sparkAnswer).map(_.toString)).mkString("\n")}
+      """.stripMargin)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
new file mode 100644
index 0000000..ad29e06
--- /dev/null
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import java.io.File
+
+import org.apache.spark.sql.QueryTest
+
+/* Implicits */
+import org.apache.spark.sql.hive.TestHive._
+
+case class TestData(key: Int, value: String)
+
+class InsertIntoHiveTableSuite extends QueryTest {
+  val testData = TestHive.sparkContext.parallelize(
+    (1 to 100).map(i => TestData(i, i.toString)))
+  testData.registerAsTable("testData")
+
+  test("insertInto() HiveTable") {
+    createTable[TestData]("createAndInsertTest")
+
+    // Add some data.
+    testData.insertInto("createAndInsertTest")
+
+    // Make sure the table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq
+    )
+
+    // Add more data.
+    testData.insertInto("createAndInsertTest")
+
+    // Make sure the table has been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq ++ testData.collect().toSeq
+    )
+
+    // Now overwrite.
+    testData.insertInto("createAndInsertTest", overwrite = true)
+
+    // Make sure the registered table has also been updated.
+    checkAnswer(
+      sql("SELECT * FROM createAndInsertTest"),
+      testData.collect().toSeq
+    )
+  }
+
+  test("Double create fails when allowExisting = false") {
+    createTable[TestData]("doubleCreateAndInsertTest")
+
+    intercept[org.apache.hadoop.hive.ql.metadata.HiveException] {
+      createTable[TestData]("doubleCreateAndInsertTest", allowExisting = false)
+    }
+  }
+
+  test("Double create does not fail when allowExisting = true") {
+    createTable[TestData]("createAndInsertTest")
+    createTable[TestData]("createAndInsertTest")
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/273c2fd0/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
index aade62e..843c681 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/parquet/HiveParquetSuite.scala
@@ -89,44 +89,6 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
     compareRDDs(rddOrig, rddCopy, "testsource", ParquetTestData.testSchemaFieldNames)
   }
 
-  test("CREATE TABLE of Parquet table") {
-    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
-      .registerAsTable("tmp")
-    val rddCopy =
-      hql("INSERT INTO TABLE tmp SELECT * FROM src")
-      .collect()
-      .sortBy[Int](_.apply(0) match {
-        case x: Int => x
-        case _ => 0
-      })
-    val rddOrig = hql("SELECT * FROM src")
-      .collect()
-      .sortBy(_.getInt(0))
-    compareRDDs(rddOrig, rddCopy, "src (Hive)", Seq("key:Int", "value:String"))
-  }
-
-  test("Appending to Parquet table") {
-    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
-      .registerAsTable("tmpnew")
-    hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
-    hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
-    hql("INSERT INTO TABLE tmpnew SELECT * FROM src").collect()
-    val rddCopies = hql("SELECT * FROM tmpnew").collect()
-    val rddOrig = hql("SELECT * FROM src").collect()
-    assert(rddCopies.size === 3 * rddOrig.size, "number of copied rows via INSERT INTO did not match correct number")
-  }
-
-  test("Appending to and then overwriting Parquet table") {
-    createParquetFile(dirname.getAbsolutePath, ("key", IntegerType), ("value", StringType))
-      .registerAsTable("tmp")
-    hql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
-    hql("INSERT INTO TABLE tmp SELECT * FROM src").collect()
-    hql("INSERT OVERWRITE TABLE tmp SELECT * FROM src").collect()
-    val rddCopies = hql("SELECT * FROM tmp").collect()
-    val rddOrig = hql("SELECT * FROM src").collect()
-    assert(rddCopies.size === rddOrig.size, "INSERT OVERWRITE did not actually overwrite")
-  }
-
   private def compareRDDs(rddOne: Array[Row], rddTwo: Array[Row], tableName: String, fieldNames: Seq[String]) {
     var counter = 0
     (rddOne, rddTwo).zipped.foreach {
@@ -137,18 +99,4 @@ class HiveParquetSuite extends FunSuite with BeforeAndAfterAll with BeforeAndAft
     counter = counter + 1
     }
   }
-
-  /**
-   * Creates an empty SchemaRDD backed by a ParquetRelation.
-   *
-   * TODO: since this is so experimental it is better to have it here and not
-   * in SQLContext. Also note that when creating new AttributeReferences
-   * one needs to take care not to create duplicate Attribute ID's.
-   */
-  private def createParquetFile(path: String, schema: (Tuple2[String, DataType])*): SchemaRDD = {
-    val attributes = schema.map(t => new AttributeReference(t._1, t._2)())
-    new SchemaRDD(
-      TestHive,
-      parquet.ParquetRelation.createEmpty(path, attributes, sparkContext.hadoopConfiguration))
-  }
 }