You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/01/03 14:18:39 UTC

spark git commit: [SPARK-20236][SQL] dynamic partition overwrite

Repository: spark
Updated Branches:
  refs/heads/master 1a87a1609 -> a66fe36ce


[SPARK-20236][SQL] dynamic partition overwrite

## What changes were proposed in this pull request?

When overwriting a partitioned table with dynamic partition columns, the behavior is different between data source and hive tables.

data source table: delete all partition directories that match the static partition values provided in the insert statement.

hive table: only delete partition directories which have data written into it

This PR adds a new config to make users be able to choose hive's behavior.

## How was this patch tested?

new tests

Author: Wenchen Fan <we...@databricks.com>

Closes #18714 from cloud-fan/overwrite-partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a66fe36c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a66fe36c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a66fe36c

Branch: refs/heads/master
Commit: a66fe36cee9363b01ee70e469f1c968f633c5713
Parents: 1a87a16
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Jan 3 22:18:13 2018 +0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Wed Jan 3 22:18:13 2018 +0800

----------------------------------------------------------------------
 .../spark/internal/io/FileCommitProtocol.scala  | 25 +++++--
 .../io/HadoopMapReduceCommitProtocol.scala      | 75 +++++++++++++++----
 .../org/apache/spark/sql/internal/SQLConf.scala | 21 ++++++
 .../InsertIntoHadoopFsRelationCommand.scala     | 20 ++++-
 .../SQLHadoopMapReduceCommitProtocol.scala      | 10 ++-
 .../apache/spark/sql/sources/InsertSuite.scala  | 78 ++++++++++++++++++++
 6 files changed, 200 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
index 50f51e1..6d0059b 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/FileCommitProtocol.scala
@@ -28,8 +28,9 @@ import org.apache.spark.util.Utils
  *
  * 1. Implementations must be serializable, as the committer instance instantiated on the driver
  *    will be used for tasks on executors.
- * 2. Implementations should have a constructor with 2 arguments:
- *      (jobId: String, path: String)
+ * 2. Implementations should have a constructor with 2 or 3 arguments:
+ *      (jobId: String, path: String) or
+ *      (jobId: String, path: String, dynamicPartitionOverwrite: Boolean)
  * 3. A committer should not be reused across multiple Spark jobs.
  *
  * The proper call sequence is:
@@ -139,10 +140,22 @@ object FileCommitProtocol {
   /**
    * Instantiates a FileCommitProtocol using the given className.
    */
-  def instantiate(className: String, jobId: String, outputPath: String)
-    : FileCommitProtocol = {
+  def instantiate(
+      className: String,
+      jobId: String,
+      outputPath: String,
+      dynamicPartitionOverwrite: Boolean = false): FileCommitProtocol = {
     val clazz = Utils.classForName(className).asInstanceOf[Class[FileCommitProtocol]]
-    val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
-    ctor.newInstance(jobId, outputPath)
+    // First try the constructor with arguments (jobId: String, outputPath: String,
+    // dynamicPartitionOverwrite: Boolean).
+    // If that doesn't exist, try the one with (jobId: string, outputPath: String).
+    try {
+      val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String], classOf[Boolean])
+      ctor.newInstance(jobId, outputPath, dynamicPartitionOverwrite.asInstanceOf[java.lang.Boolean])
+    } catch {
+      case _: NoSuchMethodException =>
+        val ctor = clazz.getDeclaredConstructor(classOf[String], classOf[String])
+        ctor.newInstance(jobId, outputPath)
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
index 95c99d2..6d20ef1 100644
--- a/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/internal/io/HadoopMapReduceCommitProtocol.scala
@@ -39,8 +39,19 @@ import org.apache.spark.mapred.SparkHadoopMapRedUtil
  *
  * @param jobId the job's or stage's id
  * @param path the job's output path, or null if committer acts as a noop
+ * @param dynamicPartitionOverwrite If true, Spark will overwrite partition directories at runtime
+ *                                  dynamically, i.e., we first write files under a staging
+ *                                  directory with partition path, e.g.
+ *                                  /path/to/staging/a=1/b=1/xxx.parquet. When committing the job,
+ *                                  we first clean up the corresponding partition directories at
+ *                                  destination path, e.g. /path/to/destination/a=1/b=1, and move
+ *                                  files from staging directory to the corresponding partition
+ *                                  directories under destination path.
  */
-class HadoopMapReduceCommitProtocol(jobId: String, path: String)
+class HadoopMapReduceCommitProtocol(
+    jobId: String,
+    path: String,
+    dynamicPartitionOverwrite: Boolean = false)
   extends FileCommitProtocol with Serializable with Logging {
 
   import FileCommitProtocol._
@@ -67,9 +78,17 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
   @transient private var addedAbsPathFiles: mutable.Map[String, String] = null
 
   /**
-   * The staging directory for all files committed with absolute output paths.
+   * Tracks partitions with default path that have new files written into them by this task,
+   * e.g. a=1/b=2. Files under these partitions will be saved into staging directory and moved to
+   * destination directory at the end, if `dynamicPartitionOverwrite` is true.
    */
-  private def absPathStagingDir: Path = new Path(path, "_temporary-" + jobId)
+  @transient private var partitionPaths: mutable.Set[String] = null
+
+  /**
+   * The staging directory of this write job. Spark uses it to deal with files with absolute output
+   * path, or writing data into partitioned directory with dynamicPartitionOverwrite=true.
+   */
+  private def stagingDir = new Path(path, ".spark-staging-" + jobId)
 
   protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
     val format = context.getOutputFormatClass.newInstance()
@@ -85,11 +104,16 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
       taskContext: TaskAttemptContext, dir: Option[String], ext: String): String = {
     val filename = getFilename(taskContext, ext)
 
-    val stagingDir: String = committer match {
+    val stagingDir: Path = committer match {
+      case _ if dynamicPartitionOverwrite =>
+        assert(dir.isDefined,
+          "The dataset to be written must be partitioned when dynamicPartitionOverwrite is true.")
+        partitionPaths += dir.get
+        this.stagingDir
       // For FileOutputCommitter it has its own staging path called "work path".
       case f: FileOutputCommitter =>
-        Option(f.getWorkPath).map(_.toString).getOrElse(path)
-      case _ => path
+        new Path(Option(f.getWorkPath).map(_.toString).getOrElse(path))
+      case _ => new Path(path)
     }
 
     dir.map { d =>
@@ -106,8 +130,7 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
 
     // Include a UUID here to prevent file collisions for one task writing to different dirs.
     // In principle we could include hash(absoluteDir) instead but this is simpler.
-    val tmpOutputPath = new Path(
-      absPathStagingDir, UUID.randomUUID().toString() + "-" + filename).toString
+    val tmpOutputPath = new Path(stagingDir, UUID.randomUUID().toString() + "-" + filename).toString
 
     addedAbsPathFiles(tmpOutputPath) = absOutputPath
     tmpOutputPath
@@ -141,23 +164,42 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
 
   override def commitJob(jobContext: JobContext, taskCommits: Seq[TaskCommitMessage]): Unit = {
     committer.commitJob(jobContext)
-    val filesToMove = taskCommits.map(_.obj.asInstanceOf[Map[String, String]])
-      .foldLeft(Map[String, String]())(_ ++ _)
-    logDebug(s"Committing files staged for absolute locations $filesToMove")
+
     if (hasValidPath) {
-      val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
+      val (allAbsPathFiles, allPartitionPaths) =
+        taskCommits.map(_.obj.asInstanceOf[(Map[String, String], Set[String])]).unzip
+      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+
+      val filesToMove = allAbsPathFiles.foldLeft(Map[String, String]())(_ ++ _)
+      logDebug(s"Committing files staged for absolute locations $filesToMove")
+      if (dynamicPartitionOverwrite) {
+        val absPartitionPaths = filesToMove.values.map(new Path(_).getParent).toSet
+        logDebug(s"Clean up absolute partition directories for overwriting: $absPartitionPaths")
+        absPartitionPaths.foreach(fs.delete(_, true))
+      }
       for ((src, dst) <- filesToMove) {
         fs.rename(new Path(src), new Path(dst))
       }
-      fs.delete(absPathStagingDir, true)
+
+      if (dynamicPartitionOverwrite) {
+        val partitionPaths = allPartitionPaths.foldLeft(Set[String]())(_ ++ _)
+        logDebug(s"Clean up default partition directories for overwriting: $partitionPaths")
+        for (part <- partitionPaths) {
+          val finalPartPath = new Path(path, part)
+          fs.delete(finalPartPath, true)
+          fs.rename(new Path(stagingDir, part), finalPartPath)
+        }
+      }
+
+      fs.delete(stagingDir, true)
     }
   }
 
   override def abortJob(jobContext: JobContext): Unit = {
     committer.abortJob(jobContext, JobStatus.State.FAILED)
     if (hasValidPath) {
-      val fs = absPathStagingDir.getFileSystem(jobContext.getConfiguration)
-      fs.delete(absPathStagingDir, true)
+      val fs = stagingDir.getFileSystem(jobContext.getConfiguration)
+      fs.delete(stagingDir, true)
     }
   }
 
@@ -165,13 +207,14 @@ class HadoopMapReduceCommitProtocol(jobId: String, path: String)
     committer = setupCommitter(taskContext)
     committer.setupTask(taskContext)
     addedAbsPathFiles = mutable.Map[String, String]()
+    partitionPaths = mutable.Set[String]()
   }
 
   override def commitTask(taskContext: TaskAttemptContext): TaskCommitMessage = {
     val attemptId = taskContext.getTaskAttemptID
     SparkHadoopMapRedUtil.commitTask(
       committer, taskContext, attemptId.getJobID.getId, attemptId.getTaskID.getId)
-    new TaskCommitMessage(addedAbsPathFiles.toMap)
+    new TaskCommitMessage(addedAbsPathFiles.toMap -> partitionPaths.toSet)
   }
 
   override def abortTask(taskContext: TaskAttemptContext): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 80cdc61..5d6edf6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -1068,6 +1068,24 @@ object SQLConf {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefault(100)
 
+  object PartitionOverwriteMode extends Enumeration {
+    val STATIC, DYNAMIC = Value
+  }
+
+  val PARTITION_OVERWRITE_MODE =
+    buildConf("spark.sql.sources.partitionOverwriteMode")
+      .doc("When INSERT OVERWRITE a partitioned data source table, we currently support 2 modes: " +
+        "static and dynamic. In static mode, Spark deletes all the partitions that match the " +
+        "partition specification(e.g. PARTITION(a=1,b)) in the INSERT statement, before " +
+        "overwriting. In dynamic mode, Spark doesn't delete partitions ahead, and only overwrite " +
+        "those partitions that have data written into it at runtime. By default we use static " +
+        "mode to keep the same behavior of Spark prior to 2.3. Note that this config doesn't " +
+        "affect Hive serde tables, as they are always overwritten with dynamic mode.")
+      .stringConf
+      .transform(_.toUpperCase(Locale.ROOT))
+      .checkValues(PartitionOverwriteMode.values.map(_.toString))
+      .createWithDefault(PartitionOverwriteMode.STATIC.toString)
+
   object Deprecated {
     val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
@@ -1394,6 +1412,9 @@ class SQLConf extends Serializable with Logging {
 
   def concatBinaryAsString: Boolean = getConf(CONCAT_BINARY_AS_STRING)
 
+  def partitionOverwriteMode: PartitionOverwriteMode.Value =
+    PartitionOverwriteMode.withName(getConf(PARTITION_OVERWRITE_MODE))
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */

http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index ad24e28..dd7ef0d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.SparkPlan
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.util.SchemaUtils
 
 /**
@@ -89,13 +90,19 @@ case class InsertIntoHadoopFsRelationCommand(
     }
 
     val pathExists = fs.exists(qualifiedOutputPath)
-    // If we are appending data to an existing dir.
-    val isAppend = pathExists && (mode == SaveMode.Append)
+
+    val enableDynamicOverwrite =
+      sparkSession.sessionState.conf.partitionOverwriteMode == PartitionOverwriteMode.DYNAMIC
+    // This config only makes sense when we are overwriting a partitioned dataset with dynamic
+    // partition columns.
+    val dynamicPartitionOverwrite = enableDynamicOverwrite && mode == SaveMode.Overwrite &&
+      staticPartitions.size < partitionColumns.length
 
     val committer = FileCommitProtocol.instantiate(
       sparkSession.sessionState.conf.fileCommitProtocolClass,
       jobId = java.util.UUID.randomUUID().toString,
-      outputPath = outputPath.toString)
+      outputPath = outputPath.toString,
+      dynamicPartitionOverwrite = dynamicPartitionOverwrite)
 
     val doInsertion = (mode, pathExists) match {
       case (SaveMode.ErrorIfExists, true) =>
@@ -103,6 +110,9 @@ case class InsertIntoHadoopFsRelationCommand(
       case (SaveMode.Overwrite, true) =>
         if (ifPartitionNotExists && matchingPartitions.nonEmpty) {
           false
+        } else if (dynamicPartitionOverwrite) {
+          // For dynamic partition overwrite, do not delete partition directories ahead.
+          true
         } else {
           deleteMatchingPartitions(fs, qualifiedOutputPath, customPartitionLocations, committer)
           true
@@ -126,7 +136,9 @@ case class InsertIntoHadoopFsRelationCommand(
               catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
               ifNotExists = true).run(sparkSession)
           }
-          if (mode == SaveMode.Overwrite) {
+          // For dynamic partition overwrite, we never remove partitions but only update existing
+          // ones.
+          if (mode == SaveMode.Overwrite && !dynamicPartitionOverwrite) {
             val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
             if (deletedPartitions.nonEmpty) {
               AlterTableDropPartitionCommand(

http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
index 40825a1..39c594a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/SQLHadoopMapReduceCommitProtocol.scala
@@ -29,11 +29,15 @@ import org.apache.spark.sql.internal.SQLConf
  * A variant of [[HadoopMapReduceCommitProtocol]] that allows specifying the actual
  * Hadoop output committer using an option specified in SQLConf.
  */
-class SQLHadoopMapReduceCommitProtocol(jobId: String, path: String)
-  extends HadoopMapReduceCommitProtocol(jobId, path) with Serializable with Logging {
+class SQLHadoopMapReduceCommitProtocol(
+    jobId: String,
+    path: String,
+    dynamicPartitionOverwrite: Boolean = false)
+  extends HadoopMapReduceCommitProtocol(jobId, path, dynamicPartitionOverwrite)
+    with Serializable with Logging {
 
   override protected def setupCommitter(context: TaskAttemptContext): OutputCommitter = {
-    var committer = context.getOutputFormatClass.newInstance().getOutputCommitter(context)
+    var committer = super.setupCommitter(context)
 
     val configuration = context.getConfiguration
     val clazz =

http://git-wip-us.apache.org/repos/asf/spark/blob/a66fe36c/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 8b7e2e5..fef01c8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -21,6 +21,8 @@ import java.io.File
 
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.util.Utils
 
@@ -442,4 +444,80 @@ class InsertSuite extends DataSourceTest with SharedSQLContext {
       assert(e.contains("Only Data Sources providing FileFormat are supported"))
     }
   }
+
+  test("SPARK-20236: dynamic partition overwrite without catalog table") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+      withTempPath { path =>
+        Seq((1, 1, 1)).toDF("i", "part1", "part2")
+          .write.partitionBy("part1", "part2").parquet(path.getAbsolutePath)
+        checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(1, 1, 1))
+
+        Seq((2, 1, 1)).toDF("i", "part1", "part2")
+          .write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
+        checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1))
+
+        Seq((2, 2, 2)).toDF("i", "part1", "part2")
+          .write.partitionBy("part1", "part2").mode("overwrite").parquet(path.getAbsolutePath)
+        checkAnswer(spark.read.parquet(path.getAbsolutePath), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+      }
+    }
+  }
+
+  test("SPARK-20236: dynamic partition overwrite") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+      withTable("t") {
+        sql(
+          """
+            |create table t(i int, part1 int, part2 int) using parquet
+            |partitioned by (part1, part2)
+          """.stripMargin)
+
+        sql("insert into t partition(part1=1, part2=1) select 1")
+        checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+        sql("insert overwrite table t partition(part1=1, part2=1) select 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1))
+
+        sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+
+        sql("insert overwrite table t partition(part1=1, part2=2) select 3")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+
+        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
+        checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+      }
+    }
+  }
+
+  test("SPARK-20236: dynamic partition overwrite with customer partition path") {
+    withSQLConf(SQLConf.PARTITION_OVERWRITE_MODE.key -> PartitionOverwriteMode.DYNAMIC.toString) {
+      withTable("t") {
+        sql(
+          """
+            |create table t(i int, part1 int, part2 int) using parquet
+            |partitioned by (part1, part2)
+          """.stripMargin)
+
+        val path1 = Utils.createTempDir()
+        sql(s"alter table t add partition(part1=1, part2=1) location '$path1'")
+        sql(s"insert into t partition(part1=1, part2=1) select 1")
+        checkAnswer(spark.table("t"), Row(1, 1, 1))
+
+        sql("insert overwrite table t partition(part1=1, part2=1) select 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1))
+
+        sql("insert overwrite table t partition(part1=2, part2) select 2, 2")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Nil)
+
+        val path2 = Utils.createTempDir()
+        sql(s"alter table t add partition(part1=1, part2=2) location '$path2'")
+        sql("insert overwrite table t partition(part1=1, part2=2) select 3")
+        checkAnswer(spark.table("t"), Row(2, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+
+        sql("insert overwrite table t partition(part1=1, part2) select 4, 1")
+        checkAnswer(spark.table("t"), Row(4, 1, 1) :: Row(2, 2, 2) :: Row(3, 1, 2) :: Nil)
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org