You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/11/02 06:15:23 UTC

spark git commit: [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables

Repository: spark
Updated Branches:
  refs/heads/master 620da3b48 -> abefe2ec4


[SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables

## What changes were proposed in this pull request?

There are a couple issues with the current 2.1 behavior when inserting into Datasource tables with partitions managed by Hive.

(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.

This PR fixes both of these issues for Datasource tables managed by Hive. The behavior for legacy tables or when `manageFilesourcePartitions = false` is unchanged.

There is one other issue in that INSERT OVERWRITE with dynamic partitions will overwrite the entire table instead of just the updated partitions, but this behavior is pretty complicated to implement for Datasource tables. We should address that in a future release.

## How was this patch tested?

Unit tests.

Author: Eric Liang <ek...@databricks.com>

Closes #15705 from ericl/sc-4942.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abefe2ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abefe2ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abefe2ec

Branch: refs/heads/master
Commit: abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b
Parents: 620da3b
Author: Eric Liang <ek...@databricks.com>
Authored: Wed Nov 2 14:15:10 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Nov 2 14:15:10 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  9 +++-
 .../plans/logical/basicLogicalOperators.scala   | 19 ++++++-
 .../sql/catalyst/parser/PlanParserSuite.scala   | 15 ++++--
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../datasources/CatalogFileIndex.scala          |  5 +-
 .../datasources/DataSourceStrategy.scala        | 30 +++++++++--
 .../InsertIntoDataSourceCommand.scala           |  6 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  3 +-
 .../CreateHiveTableAsSelectCommand.scala        |  5 +-
 .../PartitionProviderCompatibilitySuite.scala   | 52 ++++++++++++++++++++
 11 files changed, 129 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca..e901683 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
       def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
         InsertIntoTable(
           analysis.UnresolvedRelation(TableIdentifier(tableName)),
-          Map.empty, logicalPlan, overwrite, false)
+          Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
 
       def as(alias: String): LogicalPlan = logicalPlan match {
         case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6..ac1577b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
       throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
         "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
     }
+    val overwrite = ctx.OVERWRITE != null
+    val overwritePartition =
+      if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) {
+        Some(partitionKeys.map(t => (t._1, t._2.get)))
+      } else {
+        None
+      }
 
     InsertIntoTable(
       UnresolvedRelation(tableIdent, None),
       partitionKeys,
       query,
-      ctx.OVERWRITE != null,
+      OverwriteOptions(overwrite, overwritePartition),
       ctx.EXISTS != null)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index a48974c..7a15c22 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
@@ -345,18 +346,32 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
   override lazy val statistics: Statistics = super.statistics.copy(isBroadcastable = true)
 }
 
+/**
+ * Options for writing new data into a table.
+ *
+ * @param enabled whether to overwrite existing data in the table.
+ * @param specificPartition only data in the specified partition will be overwritten.
+ */
+case class OverwriteOptions(
+    enabled: Boolean,
+    specificPartition: Option[CatalogTypes.TablePartitionSpec] = None) {
+  if (specificPartition.isDefined) {
+    assert(enabled, "Overwrite must be enabled when specifying a partition to overwrite.")
+  }
+}
+
 case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
-    overwrite: Boolean,
+    overwrite: OverwriteOptions,
     ifNotExists: Boolean)
   extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty
 
-  assert(overwrite || !ifNotExists)
+  assert(overwrite.enabled || !ifNotExists)
   assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
 
   override lazy val resolved: Boolean = childrenResolved && table.resolved

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index ca86304..7400f34 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,7 +180,16 @@ class PlanParserSuite extends PlanTest {
         partition: Map[String, Option[String]],
         overwrite: Boolean = false,
         ifNotExists: Boolean = false): LogicalPlan =
-      InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
+      InsertIntoTable(
+        table("s"), partition, plan,
+        OverwriteOptions(
+          overwrite,
+          if (overwrite && partition.nonEmpty) {
+            Some(partition.map(kv => (kv._1, kv._2.get)))
+          } else {
+            None
+          }),
+        ifNotExists)
 
     // Single inserts
     assertEqual(s"insert overwrite table s $sql",
@@ -196,9 +205,9 @@ class PlanParserSuite extends PlanTest {
     val plan2 = table("t").where('x > 5).select(star())
     assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
       InsertIntoTable(
-        table("s"), Map.empty, plan.limit(1), overwrite = false, ifNotExists = false).union(
+        table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
         InsertIntoTable(
-          table("u"), Map.empty, plan2, overwrite = false, ifNotExists = false)))
+          table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
   }
 
   test ("insert with if not exists") {

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 11dd1df..700f483 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions, Union}
 import org.apache.spark.sql.execution.command.AlterTableRecoverPartitionsCommand
 import org.apache.spark.sql.execution.datasources.{CaseInsensitiveMap, CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
@@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
         child = df.logicalPlan,
-        overwrite = mode == SaveMode.Overwrite,
+        overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
         ifNotExists = false)).toRdd
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
index 092aabc..443a2ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/CatalogFileIndex.scala
@@ -67,7 +67,10 @@ class CatalogFileIndex(
       val selectedPartitions = sparkSession.sessionState.catalog.listPartitionsByFilter(
         table.identifier, filters)
       val partitions = selectedPartitions.map { p =>
-        PartitionPath(p.toRow(partitionSchema), p.storage.locationUri.get)
+        val path = new Path(p.storage.locationUri.get)
+        val fs = path.getFileSystem(hadoopConf)
+        PartitionPath(
+          p.toRow(partitionSchema), path.makeQualified(fs.getUri, fs.getWorkingDirectory))
       }
       val partitionSpec = PartitionSpec(partitionSchema, partitions)
       new PrunedInMemoryFileIndex(

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 34b77ca..47c1f9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources
 
 import scala.collection.mutable.ArrayBuffer
 
+import org.apache.hadoop.fs.Path
+
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
@@ -174,14 +176,32 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
       }.flatten
 
-      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
-      if (overwrite && inputPaths.contains(outputPath)) {
+      val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
+      if (overwrite.enabled && inputPaths.contains(outputPath)) {
         throw new AnalysisException(
           "Cannot overwrite a path that is also being read from.")
       }
 
+      val overwritingSinglePartition = (overwrite.specificPartition.isDefined &&
+        t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
+        l.catalogTable.get.partitionProviderIsHive)
+
+      val effectiveOutputPath = if (overwritingSinglePartition) {
+        val partition = t.sparkSession.sessionState.catalog.getPartition(
+          l.catalogTable.get.identifier, overwrite.specificPartition.get)
+        new Path(partition.storage.locationUri.get)
+      } else {
+        outputPath
+      }
+
+      val effectivePartitionSchema = if (overwritingSinglePartition) {
+        Nil
+      } else {
+        query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
+      }
+
       def refreshPartitionsCallback(updatedPartitions: Seq[TablePartitionSpec]): Unit = {
-        if (l.catalogTable.isDefined &&
+        if (l.catalogTable.isDefined && updatedPartitions.nonEmpty &&
             l.catalogTable.get.partitionColumnNames.nonEmpty &&
             l.catalogTable.get.partitionProviderIsHive) {
           val metastoreUpdater = AlterTableAddPartitionCommand(
@@ -194,8 +214,8 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
       }
 
       val insertCmd = InsertIntoHadoopFsRelationCommand(
-        outputPath,
-        query.resolve(t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver),
+        effectiveOutputPath,
+        effectivePartitionSchema,
         t.bucketSpec,
         t.fileFormat,
         refreshPartitionsCallback,

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index b2ff68a..2eba1e9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.sources.InsertableRelation
 
@@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
-    overwrite: Boolean)
+    overwrite: OverwriteOptions)
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
     val data = Dataset.ofRows(sparkSession, query)
     // Apply the schema of the existing table to the new data.
     val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
-    relation.insert(df, overwrite)
+    relation.insert(df, overwrite.enabled)
 
     // Invalidate the cache.
     sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 9d29309..ce1e3eb 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -46,7 +46,8 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(
           table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
-        InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+        InsertIntoHiveTable(
+          table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil
 
       case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
         val newTableDesc = if (tableDesc.storage.serde.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a0..cac4359 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -88,7 +88,8 @@ case class CreateHiveTableAsSelectCommand(
     } else {
       try {
         sparkSession.sessionState.executePlan(InsertIntoTable(
-          metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+          metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
+          ifNotExists = false)).toRdd
       } catch {
         case NonFatal(e) =>
           // drop the created table.

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
index 5f16960..ac435bf 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
@@ -134,4 +134,56 @@ class PartitionProviderCompatibilitySuite
       }
     }
   }
+
+  test("insert overwrite partition of legacy datasource table overwrites entire table") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "false") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir)
+          spark.sql(
+            """insert overwrite table test
+              |partition (partCol=1)
+              |select * from range(100)""".stripMargin)
+          assert(spark.sql("select * from test").count() == 100)
+
+          // Dynamic partitions case
+          spark.sql("insert overwrite table test select id, id from range(10)".stripMargin)
+          assert(spark.sql("select * from test").count() == 10)
+        }
+      }
+    }
+  }
+
+  test("insert overwrite partition of new datasource table overwrites just partition") {
+    withSQLConf(SQLConf.HIVE_MANAGE_FILESOURCE_PARTITIONS.key -> "true") {
+      withTable("test") {
+        withTempDir { dir =>
+          setupPartitionedDatasourceTable("test", dir)
+          sql("msck repair table test")
+          spark.sql(
+            """insert overwrite table test
+              |partition (partCol=1)
+              |select * from range(100)""".stripMargin)
+          assert(spark.sql("select * from test").count() == 104)
+
+          // Test overwriting a partition that has a custom location
+          withTempDir { dir2 =>
+            sql(
+              s"""alter table test partition (partCol=1)
+                |set location '${dir2.getAbsolutePath}'""".stripMargin)
+            assert(sql("select * from test").count() == 4)
+            sql(
+              """insert overwrite table test
+                |partition (partCol=1)
+                |select * from range(30)""".stripMargin)
+            sql(
+              """insert overwrite table test
+                |partition (partCol=1)
+                |select * from range(20)""".stripMargin)
+            assert(sql("select * from test").count() == 24)
+          }
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org