You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2015/05/23 18:48:25 UTC

spark git commit: [SPARK-7654] [SQL] Move insertInto into reader/writer interface.

Repository: spark
Updated Branches:
  refs/heads/master a4df0f2d8 -> 2b7e63585


[SPARK-7654] [SQL] Move insertInto into reader/writer interface.

This one continues the work of https://github.com/apache/spark/pull/6216.

Author: Yin Huai <yh...@databricks.com>
Author: Reynold Xin <rx...@databricks.com>

Closes #6366 from yhuai/insert and squashes the following commits:

3d717fb [Yin Huai] Use insertInto to handle the casue when table exists and Append is used for saveAsTable.
56d2540 [Yin Huai] Add PreWriteCheck to HiveContext's analyzer.
c636e35 [Yin Huai] Remove unnecessary empty lines.
cf83837 [Yin Huai] Move insertInto to write. Also, remove the partition columns from InsertIntoHadoopFsRelation.
0841a54 [Reynold Xin] Removed experimental tag for deprecated methods.
33ed8ef [Reynold Xin] [SPARK-7654][SQL] Move insertInto into reader/writer interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2b7e6358
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2b7e6358
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2b7e6358

Branch: refs/heads/master
Commit: 2b7e63585d61be2dab78b70af3867cda3983d5b1
Parents: a4df0f2
Author: Yin Huai <yh...@databricks.com>
Authored: Sat May 23 09:48:20 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sat May 23 09:48:20 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/DataFrame.scala  | 52 +++++++--------
 .../org/apache/spark/sql/DataFrameReader.scala  | 18 +-----
 .../org/apache/spark/sql/DataFrameWriter.scala  | 66 +++++++++++++++++---
 .../spark/sql/parquet/ParquetTableSupport.scala |  2 +-
 .../spark/sql/sources/DataSourceStrategy.scala  |  5 +-
 .../org/apache/spark/sql/sources/commands.scala |  2 +-
 .../org/apache/spark/sql/sources/ddl.scala      |  1 -
 .../org/apache/spark/sql/sources/rules.scala    | 19 +++++-
 .../org/apache/spark/sql/hive/HiveContext.scala |  4 ++
 .../sql/hive/InsertIntoHiveTableSuite.scala     |  6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  8 +--
 .../sql/hive/execution/SQLQuerySuite.scala      |  8 +--
 .../apache/spark/sql/hive/parquetSuites.scala   |  4 +-
 .../sql/sources/hadoopFsRelationSuites.scala    | 10 ---
 14 files changed, 116 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
index 3ec1c4a..f968577 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala
@@ -1395,28 +1395,6 @@ class DataFrame private[sql](
   def write: DataFrameWriter = new DataFrameWriter(this)
 
   /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
-   * @group output
-   * @since 1.3.0
-   */
-  @Experimental
-  def insertInto(tableName: String, overwrite: Boolean): Unit = {
-    sqlContext.executePlan(InsertIntoTable(UnresolvedRelation(Seq(tableName)),
-      Map.empty, logicalPlan, overwrite, ifNotExists = false)).toRdd
-  }
-
-  /**
-   * :: Experimental ::
-   * Adds the rows from this RDD to the specified table.
-   * Throws an exception if the table already exists.
-   * @group output
-   * @since 1.3.0
-   */
-  @Experimental
-  def insertInto(tableName: String): Unit = insertInto(tableName, overwrite = false)
-
-  /**
    * Returns the content of the [[DataFrame]] as a RDD of JSON strings.
    * @group rdd
    * @since 1.3.0
@@ -1551,13 +1529,7 @@ class DataFrame private[sql](
    */
   @deprecated("Use write.mode(mode).saveAsTable(tableName)", "1.4.0")
   def saveAsTable(tableName: String, mode: SaveMode): Unit = {
-    if (sqlContext.catalog.tableExists(Seq(tableName)) && mode == SaveMode.Append) {
-      // If table already exists and the save mode is Append,
-      // we will just call insertInto to append the contents of this DataFrame.
-      insertInto(tableName, overwrite = false)
-    } else {
-      write.mode(mode).saveAsTable(tableName)
-    }
+    write.mode(mode).saveAsTable(tableName)
   }
 
   /**
@@ -1713,9 +1685,29 @@ class DataFrame private[sql](
     write.format(source).mode(mode).options(options).save()
   }
 
+
+  /**
+   * Adds the rows from this RDD to the specified table, optionally overwriting the existing data.
+   * @group output
+   */
+  @deprecated("Use write.mode(SaveMode.Append|SaveMode.Overwrite).saveAsTable(tableName)", "1.4.0")
+  def insertInto(tableName: String, overwrite: Boolean): Unit = {
+    write.mode(if (overwrite) SaveMode.Overwrite else SaveMode.Append).insertInto(tableName)
+  }
+
+  /**
+   * Adds the rows from this RDD to the specified table.
+   * Throws an exception if the table already exists.
+   * @group output
+   */
+  @deprecated("Use write.mode(SaveMode.Append).saveAsTable(tableName)", "1.4.0")
+  def insertInto(tableName: String): Unit = {
+    write.mode(SaveMode.Append).insertInto(tableName)
+  }
+
   ////////////////////////////////////////////////////////////////////////////
   ////////////////////////////////////////////////////////////////////////////
-  // End of eeprecated methods
+  // End of deprecated methods
   ////////////////////////////////////////////////////////////////////////////
   ////////////////////////////////////////////////////////////////////////////
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 381c10f..b44d4c8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -95,20 +95,6 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
   }
 
   /**
-   * Specifies the input partitioning. If specified, the underlying data source does not need to
-   * discover the data partitioning scheme, and thus can speed up very large inputs.
-   *
-   * This is only applicable for Parquet at the moment.
-   *
-   * @since 1.4.0
-   */
-  @scala.annotation.varargs
-  def partitionBy(colNames: String*): DataFrameReader = {
-    this.partitioningColumns = Option(colNames)
-    this
-  }
-
-  /**
    * Loads input in as a [[DataFrame]], for data sources that require a path (e.g. data backed by
    * a local or distributed file system).
    *
@@ -128,7 +114,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
     val resolved = ResolvedDataSource(
       sqlContext,
       userSpecifiedSchema = userSpecifiedSchema,
-      partitionColumns = partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+      partitionColumns = Array.empty[String],
       provider = source,
       options = extraOptions.toMap)
     DataFrame(sqlContext, LogicalRelation(resolved.relation))
@@ -300,6 +286,4 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
 
   private var extraOptions = new scala.collection.mutable.HashMap[String, String]
 
-  private var partitioningColumns: Option[Seq[String]] = None
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index f2e721d..5548b26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql
 import java.util.Properties
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.jdbc.{JDBCWriteDetails, JdbcUtils}
 import org.apache.spark.sql.sources.{ResolvedDataSource, CreateTableUsingAsSelect}
 
@@ -149,21 +151,65 @@ final class DataFrameWriter private[sql](df: DataFrame) {
   }
 
   /**
+   * Inserts the content of the [[DataFrame]] to the specified table. It requires that
+   * the schema of the [[DataFrame]] is the same as the schema of the table.
+   *
+   * Because it inserts data to an existing table, format or options will be ignored.
+   *
+   * @since 1.4.0
+   */
+  def insertInto(tableName: String): Unit = {
+    val partitions =
+      partitioningColumns.map(_.map(col => col -> (None: Option[String])).toMap)
+    val overwrite = (mode == SaveMode.Overwrite)
+    df.sqlContext.executePlan(InsertIntoTable(
+      UnresolvedRelation(Seq(tableName)),
+      partitions.getOrElse(Map.empty[String, Option[String]]),
+      df.logicalPlan,
+      overwrite,
+      ifNotExists = false)).toRdd
+  }
+
+  /**
    * Saves the content of the [[DataFrame]] as the specified table.
    *
+   * In the case the table already exists, behavior of this function depends on the
+   * save mode, specified by the `mode` function (default to throwing an exception).
+   * When `mode` is `Overwrite`, the schema of the [[DataFrame]] does not need to be
+   * the same as that of the existing table.
+   * When `mode` is `Append`, the schema of the [[DataFrame]] need to be
+   * the same as that of the existing table, and format or options will be ignored.
+   *
    * @since 1.4.0
    */
   def saveAsTable(tableName: String): Unit = {
-    val cmd =
-      CreateTableUsingAsSelect(
-        tableName,
-        source,
-        temporary = false,
-        partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-        mode,
-        extraOptions.toMap,
-        df.logicalPlan)
-    df.sqlContext.executePlan(cmd).toRdd
+    if (df.sqlContext.catalog.tableExists(tableName :: Nil) && mode != SaveMode.Overwrite) {
+      mode match {
+        case SaveMode.Ignore =>
+          // Do nothing
+
+        case SaveMode.ErrorIfExists =>
+          throw new AnalysisException(s"Table $tableName already exists.")
+
+        case SaveMode.Append =>
+          // If it is Append, we just ask insertInto to handle it. We will not use insertInto
+          // to handle saveAsTable with Overwrite because saveAsTable can change the schema of
+          // the table. But, insertInto with Overwrite requires the schema of data be the same
+          // the schema of the table.
+          insertInto(tableName)
+      }
+    } else {
+      val cmd =
+        CreateTableUsingAsSelect(
+          tableName,
+          source,
+          temporary = false,
+          partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
+          mode,
+          extraOptions.toMap,
+          df.logicalPlan)
+      df.sqlContext.executePlan(cmd).toRdd
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index c45c431..70a220c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -129,7 +129,7 @@ private[parquet] object RowReadSupport {
 }
 
 /**
- * A `parquet.hadoop.api.WriteSupport` for Row ojects.
+ * A `parquet.hadoop.api.WriteSupport` for Row objects.
  */
 private[parquet] class RowWriteSupport extends WriteSupport[Row] with Logging {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
index c03649d..dacd967 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/DataSourceStrategy.scala
@@ -105,10 +105,9 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
       execution.ExecutedCommand(InsertIntoDataSource(l, query, overwrite)) :: Nil
 
     case i @ logical.InsertIntoTable(
-      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) if part.isEmpty =>
+      l @ LogicalRelation(t: HadoopFsRelation), part, query, overwrite, false) =>
       val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
-      execution.ExecutedCommand(
-        InsertIntoHadoopFsRelation(t, query, Array.empty[String], mode)) :: Nil
+      execution.ExecutedCommand(InsertIntoHadoopFsRelation(t, query, mode)) :: Nil
 
     case _ => Nil
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
index 498f753..c3674a8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/commands.scala
@@ -61,7 +61,6 @@ private[sql] case class InsertIntoDataSource(
 private[sql] case class InsertIntoHadoopFsRelation(
     @transient relation: HadoopFsRelation,
     @transient query: LogicalPlan,
-    partitionColumns: Array[String],
     mode: SaveMode)
   extends RunnableCommand {
 
@@ -100,6 +99,7 @@ private[sql] case class InsertIntoHadoopFsRelation(
         relation.schema,
         needsConversion = false)
 
+      val partitionColumns = relation.partitionColumns.fieldNames
       if (partitionColumns.isEmpty) {
         insert(new DefaultWriterContainer(relation, job), df)
       } else {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
index 5e72312..ca30b8e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/ddl.scala
@@ -335,7 +335,6 @@ private[sql] object ResolvedDataSource {
           InsertIntoHadoopFsRelation(
             r,
             project,
-            partitionColumns.toArray,
             mode)).toRdd
         r
       case _ =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
index ab33125..a3fd7f1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/sources/rules.scala
@@ -35,9 +35,9 @@ private[sql] object PreInsertCastAndRename extends Rule[LogicalPlan] {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      // We are inserting into an InsertableRelation.
+      // We are inserting into an InsertableRelation or HadoopFsRelation.
       case i @ InsertIntoTable(
-      l @ LogicalRelation(r: InsertableRelation), partition, child, overwrite, ifNotExists) => {
+      l @ LogicalRelation(_: InsertableRelation | _: HadoopFsRelation), _, child, _, _) => {
         // First, make sure the data to be inserted have the same number of fields with the
         // schema of the relation.
         if (l.output.size != child.output.size) {
@@ -101,7 +101,20 @@ private[sql] case class PreWriteCheck(catalog: Catalog) extends (LogicalPlan =>
           }
         }
 
-      case logical.InsertIntoTable(LogicalRelation(_: HadoopFsRelation), _, _, _, _) => // OK
+      case logical.InsertIntoTable(LogicalRelation(r: HadoopFsRelation), part, _, _, _) =>
+        // We need to make sure the partition columns specified by users do match partition
+        // columns of the relation.
+        val existingPartitionColumns = r.partitionColumns.fieldNames.toSet
+        val specifiedPartitionColumns = part.keySet
+        if (existingPartitionColumns != specifiedPartitionColumns) {
+          failAnalysis(s"Specified partition columns " +
+            s"(${specifiedPartitionColumns.mkString(", ")}) " +
+            s"do not match the partition columns of the table. Please use " +
+            s"(${existingPartitionColumns.mkString(", ")}) as the partition columns.")
+        } else {
+          // OK
+        }
+
       case logical.InsertIntoTable(l: LogicalRelation, _, _, _, _) =>
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
index a8e8e70..0d807f4 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala
@@ -373,6 +373,10 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) {
         ResolveHiveWindowFunction ::
         sources.PreInsertCastAndRename ::
         Nil
+
+      override val extendedCheckRules = Seq(
+        sources.PreWriteCheck(catalog)
+      )
     }
 
   override protected[sql] def createSession(): SQLSession = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
index ecb990e..acf2f7d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/InsertIntoHiveTableSuite.scala
@@ -53,7 +53,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     sql("CREATE TABLE createAndInsertTest (key int, value string)")
 
     // Add some data.
-    testData.insertInto("createAndInsertTest")
+    testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
 
     // Make sure the table has also been updated.
     checkAnswer(
@@ -62,7 +62,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     )
 
     // Add more data.
-    testData.insertInto("createAndInsertTest")
+    testData.write.mode(SaveMode.Append).insertInto("createAndInsertTest")
 
     // Make sure the table has been updated.
     checkAnswer(
@@ -71,7 +71,7 @@ class InsertIntoHiveTableSuite extends QueryTest with BeforeAndAfter {
     )
 
     // Now overwrite.
-    testData.insertInto("createAndInsertTest", overwrite = true)
+    testData.write.mode(SaveMode.Overwrite).insertInto("createAndInsertTest")
 
     // Make sure the registered table has also been updated.
     checkAnswer(

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index c4c7b63..9623ef0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -608,7 +608,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       StructType(
         StructField("a", ArrayType(IntegerType, containsNull = false), nullable = true) :: Nil)
     assert(df2.schema === expectedSchema2)
-    df2.insertInto("arrayInParquet", overwrite = false)
+    df2.write.mode(SaveMode.Append).insertInto("arrayInParquet")
     createDataFrame(Tuple1(Seq(4, 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
       .saveAsTable("arrayInParquet") // This one internally calls df2.insertInto.
     createDataFrame(Tuple1(Seq(Int.box(6), null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -642,7 +642,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       StructType(
         StructField("a", mapType2, nullable = true) :: Nil)
     assert(df2.schema === expectedSchema2)
-    df2.insertInto("mapInParquet", overwrite = false)
+    df2.write.mode(SaveMode.Append).insertInto("mapInParquet")
     createDataFrame(Tuple1(Map(4 -> 5)) :: Nil).toDF("a").write.mode(SaveMode.Append)
       .saveAsTable("mapInParquet") // This one internally calls df2.insertInto.
     createDataFrame(Tuple1(Map(6 -> null.asInstanceOf[Integer])) :: Nil).toDF("a").write
@@ -768,7 +768,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 35"),
       (6 to 34).map(i => Row(i, s"str$i")))
 
-    createDF(40, 49).insertInto("insertParquet")
+    createDF(40, 49).write.mode(SaveMode.Append).insertInto("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p WHERE p.c1 > 5 AND p.c1 < 45"),
       (6 to 44).map(i => Row(i, s"str$i")))
@@ -782,7 +782,7 @@ class MetastoreDataSourcesSuite extends QueryTest with BeforeAndAfterEach {
       sql("SELECT p.c1, c2 FROM insertParquet p"),
       (50 to 59).map(i => Row(i, s"str$i")))
 
-    createDF(70, 79).insertInto("insertParquet", overwrite = true)
+    createDF(70, 79).write.mode(SaveMode.Overwrite).insertInto("insertParquet")
     checkAnswer(
       sql("SELECT p.c1, c2 FROM insertParquet p"),
       (70 to 79).map(i => Row(i, s"str$i")))

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index ba53ed9..b707f5e 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.hive.execution
 import org.apache.spark.sql.catalyst.DefaultParserDialect
 import org.apache.spark.sql.catalyst.analysis.EliminateSubQueries
 import org.apache.spark.sql.catalyst.errors.DialectException
-import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
@@ -425,10 +425,10 @@ class SQLQuerySuite extends QueryTest {
   test("SPARK-4825 save join to table") {
     val testData = sparkContext.parallelize(1 to 10).map(i => TestData(i, i.toString)).toDF()
     sql("CREATE TABLE test1 (key INT, value STRING)")
-    testData.insertInto("test1")
+    testData.write.mode(SaveMode.Append).insertInto("test1")
     sql("CREATE TABLE test2 (key INT, value STRING)")
-    testData.insertInto("test2")
-    testData.insertInto("test2")
+    testData.write.mode(SaveMode.Append).insertInto("test2")
+    testData.write.mode(SaveMode.Append).insertInto("test2")
     sql("CREATE TABLE test AS SELECT COUNT(a.value) FROM test1 a JOIN test2 b ON a.key = b.key")
     checkAnswer(
       table("test"),

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index 223ba65..7851f38 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -316,7 +316,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(_: ParquetRelation2, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
         s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan. " +
@@ -346,7 +346,7 @@ class ParquetDataSourceOnMetastoreSuite extends ParquetMetastoreSuiteBase {
 
     val df = sql("INSERT INTO TABLE test_insert_parquet SELECT a FROM jt_array")
     df.queryExecution.executedPlan match {
-      case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _, _)) => // OK
+      case ExecutedCommand(InsertIntoHadoopFsRelation(r: ParquetRelation2, _, _)) => // OK
       case o => fail("test_insert_parquet should be converted to a " +
         s"${classOf[ParquetRelation2].getCanonicalName} and " +
         s"${classOf[InsertIntoDataSource].getCanonicalName} is expcted as the SparkPlan." +

http://git-wip-us.apache.org/repos/asf/spark/blob/2b7e6358/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index c7c8bcd..3222690 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -362,16 +362,6 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils {
         .partitionBy("p1")
         .saveAsTable("t")
     }
-
-    // Using different order of partition columns
-    intercept[Throwable] {
-      partitionedTestDF2.write
-        .format(dataSourceName)
-        .mode(SaveMode.Append)
-        .option("dataSchema", dataSchema.json)
-        .partitionBy("p2", "p1")
-        .saveAsTable("t")
-    }
   }
 
   test("saveAsTable()/load() - partitioned table - ErrorIfExists") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org