You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/14 03:30:44 UTC

spark git commit: [SPARK-18566][SQL] remove OverwriteOptions

Repository: spark
Updated Branches:
  refs/heads/master f2ddabfa0 -> 3e307b495


[SPARK-18566][SQL] remove OverwriteOptions

## What changes were proposed in this pull request?

`OverwriteOptions` was introduced in https://github.com/apache/spark/pull/15705, to carry the information of static partitions. However, after further refactor, this information becomes duplicated and we can remove `OverwriteOptions`.

## How was this patch tested?

N/A

Author: Wenchen Fan <we...@databricks.com>

Closes #15995 from cloud-fan/overwrite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3e307b49
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3e307b49
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3e307b49

Branch: refs/heads/master
Commit: 3e307b4959ecdab3f9c16484d172403357e7d09b
Parents: f2ddabf
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Dec 14 11:30:34 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Wed Dec 14 11:30:34 2016 +0800

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/dsl/package.scala |   2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |   5 +-
 .../plans/logical/basicLogicalOperators.scala   |  22 +---
 .../sql/catalyst/parser/PlanParserSuite.scala   |  15 +--
 .../org/apache/spark/sql/DataFrameWriter.scala  |   4 +-
 .../sql/execution/datasources/DataSource.scala  |   2 +-
 .../datasources/DataSourceStrategy.scala        | 127 ++++++++++---------
 .../InsertIntoDataSourceCommand.scala           |   6 +-
 .../InsertIntoHadoopFsRelationCommand.scala     |  16 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  |   2 +-
 .../CreateHiveTableAsSelectCommand.scala        |   5 +-
 11 files changed, 91 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index e901683..66e52ca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
       def insertInto(tableName: String, overwrite: Boolean = false): LogicalPlan =
         InsertIntoTable(
           analysis.UnresolvedRelation(TableIdentifier(tableName)),
-          Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
+          Map.empty, logicalPlan, overwrite, false)
 
       def as(alias: String): LogicalPlan = logicalPlan match {
         case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, Option(alias))

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 7b8badc..3969fdb 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,15 +177,12 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with Logging {
       throw new ParseException(s"Dynamic partitions do not support IF NOT EXISTS. Specified " +
         "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", ",", "]"), ctx)
     }
-    val overwrite = ctx.OVERWRITE != null
-    val staticPartitionKeys: Map[String, String] =
-      partitionKeys.filter(_._2.nonEmpty).map(t => (t._1, t._2.get))
 
     InsertIntoTable(
       UnresolvedRelation(tableIdent, None),
       partitionKeys,
       query,
-      OverwriteOptions(overwrite, if (overwrite) staticPartitionKeys else Map.empty),
+      ctx.OVERWRITE != null,
       ctx.EXISTS != null)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index c210b74..b9bdd53 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -17,10 +17,8 @@
 
 package org.apache.spark.sql.catalyst.plans.logical
 
-import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
@@ -347,22 +345,6 @@ case class BroadcastHint(child: LogicalPlan) extends UnaryNode {
 }
 
 /**
- * Options for writing new data into a table.
- *
- * @param enabled whether to overwrite existing data in the table.
- * @param staticPartitionKeys if non-empty, specifies that we only want to overwrite partitions
- *                            that match this partial partition spec. If empty, all partitions
- *                            will be overwritten.
- */
-case class OverwriteOptions(
-    enabled: Boolean,
-    staticPartitionKeys: CatalogTypes.TablePartitionSpec = Map.empty) {
-  if (staticPartitionKeys.nonEmpty) {
-    assert(enabled, "Overwrite must be enabled when specifying specific partitions.")
-  }
-}
-
-/**
  * Insert some data into a table.
  *
  * @param table the logical plan representing the table. In the future this should be a
@@ -382,14 +364,14 @@ case class InsertIntoTable(
     table: LogicalPlan,
     partition: Map[String, Option[String]],
     child: LogicalPlan,
-    overwrite: OverwriteOptions,
+    overwrite: Boolean,
     ifNotExists: Boolean)
   extends LogicalPlan {
 
   override def children: Seq[LogicalPlan] = child :: Nil
   override def output: Seq[Attribute] = Seq.empty
 
-  assert(overwrite.enabled || !ifNotExists)
+  assert(overwrite || !ifNotExists)
   assert(partition.values.forall(_.nonEmpty) || !ifNotExists)
 
   override lazy val resolved: Boolean = childrenResolved && table.resolved

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
index 8e4327c..f408ba9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/PlanParserSuite.scala
@@ -180,16 +180,7 @@ class PlanParserSuite extends PlanTest {
         partition: Map[String, Option[String]],
         overwrite: Boolean = false,
         ifNotExists: Boolean = false): LogicalPlan =
-      InsertIntoTable(
-        table("s"), partition, plan,
-        OverwriteOptions(
-          overwrite,
-          if (overwrite && partition.nonEmpty) {
-            partition.map(kv => (kv._1, kv._2.get))
-          } else {
-            Map.empty
-          }),
-        ifNotExists)
+      InsertIntoTable(table("s"), partition, plan, overwrite, ifNotExists)
 
     // Single inserts
     assertEqual(s"insert overwrite table s $sql",
@@ -205,9 +196,9 @@ class PlanParserSuite extends PlanTest {
     val plan2 = table("t").where('x > 5).select(star())
     assertEqual("from t insert into s select * limit 1 insert into u select * where x > 5",
       InsertIntoTable(
-        table("s"), Map.empty, plan.limit(1), OverwriteOptions(false), ifNotExists = false).union(
+        table("s"), Map.empty, plan.limit(1), false, ifNotExists = false).union(
         InsertIntoTable(
-          table("u"), Map.empty, plan2, OverwriteOptions(false), ifNotExists = false)))
+          table("u"), Map.empty, plan2, false, ifNotExists = false)))
   }
 
   test ("insert with if not exists") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index fa8e8cb..9d28e27 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -25,7 +25,7 @@ import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.execution.command.{AlterTableRecoverPartitionsCommand, DDLUtils}
 import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.types.StructType
@@ -259,7 +259,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         table = UnresolvedRelation(tableIdent),
         partition = Map.empty[String, Option[String]],
         child = df.logicalPlan,
-        overwrite = OverwriteOptions(mode == SaveMode.Overwrite),
+        overwrite = mode == SaveMode.Overwrite,
         ifNotExists = false)).toRdd
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index eaccdf2..9250d0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -478,7 +478,7 @@ case class DataSource(
         val plan =
           InsertIntoHadoopFsRelationCommand(
             outputPath = outputPath,
-            staticPartitionKeys = Map.empty,
+            staticPartitions = Map.empty,
             customPartitionLocations = Map.empty,
             partitionColumns = columns,
             bucketSpec = bucketSpec,

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 03eed25..61f0d43 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition, SimpleCatalogRelation}
@@ -32,8 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project, Union}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPartitioning}
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
@@ -100,7 +99,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
         None
       } else if (potentialSpecs.size == 1) {
         val partValue = potentialSpecs.head._2
-        Some(Alias(Cast(Literal(partValue), field.dataType), "_staticPart")())
+        Some(Alias(Cast(Literal(partValue), field.dataType), field.name)())
       } else {
         throw new AnalysisException(
           s"Partition column ${field.name} have multiple values specified, " +
@@ -128,61 +127,75 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
     projectList
   }
 
+  /**
+   * Returns true if the [[InsertIntoTable]] plan has already been preprocessed by analyzer rule
+   * [[PreprocessTableInsertion]]. It is important that this rule([[DataSourceAnalysis]]) has to
+   * be run after [[PreprocessTableInsertion]], to normalize the column names in partition spec and
+   * fix the schema mismatch by adding Cast.
+   */
+  private def hasBeenPreprocessed(
+      tableOutput: Seq[Attribute],
+      partSchema: StructType,
+      partSpec: Map[String, Option[String]],
+      query: LogicalPlan): Boolean = {
+    val partColNames = partSchema.map(_.name).toSet
+    query.resolved && partSpec.keys.forall(partColNames.contains) && {
+      val staticPartCols = partSpec.filter(_._2.isDefined).keySet
+      val expectedColumns = tableOutput.filterNot(a => staticPartCols.contains(a.name))
+      expectedColumns.toStructType.sameType(query.schema)
+    }
+  }
+
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
-    // the user has specified static partitions, we add a Project operator on top of the query
-    // to include those constant column values in the query result.
-    //
-    // Example:
-    // Let's say that we have a table "t", which is created by
-    // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)
-    // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3"
-    // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3".
-    //
-    // Basically, we will put those partition columns having a assigned value back
-    // to the SELECT clause. The output of the SELECT clause is organized as
-    // normal_columns static_partitioning_columns dynamic_partitioning_columns.
-    // static_partitioning_columns are partitioning columns having assigned
-    // values in the PARTITION clause (e.g. b in the above example).
-    // dynamic_partitioning_columns are partitioning columns that do not assigned
-    // values in the PARTITION clause (e.g. c in the above example).
-    case insert @ logical.InsertIntoTable(
-      relation @ LogicalRelation(t: HadoopFsRelation, _, _), parts, query, overwrite, false)
-      if query.resolved && parts.exists(_._2.isDefined) =>
-
-      val projectList = convertStaticPartitions(
-        sourceAttributes = query.output,
-        providedPartitions = parts,
-        targetAttributes = relation.output,
-        targetPartitionSchema = t.partitionSchema)
-
-      // We will remove all assigned values to static partitions because they have been
-      // moved to the projectList.
-      insert.copy(partition = parts.map(p => (p._1, None)), child = Project(projectList, query))
-
-
-    case logical.InsertIntoTable(
-      l @ LogicalRelation(t: HadoopFsRelation, _, table), _, query, overwrite, false)
-        if query.resolved && t.schema.sameType(query.schema) =>
-
-      // Sanity checks
+    case InsertIntoTable(
+        l @ LogicalRelation(t: HadoopFsRelation, _, table), parts, query, overwrite, false)
+        if hasBeenPreprocessed(l.output, t.partitionSchema, parts, query) =>
+
+      // If the InsertIntoTable command is for a partitioned HadoopFsRelation and
+      // the user has specified static partitions, we add a Project operator on top of the query
+      // to include those constant column values in the query result.
+      //
+      // Example:
+      // Let's say that we have a table "t", which is created by
+      // CREATE TABLE t (a INT, b INT, c INT) USING parquet PARTITIONED BY (b, c)
+      // The statement of "INSERT INTO TABLE t PARTITION (b=2, c) SELECT 1, 3"
+      // will be converted to "INSERT INTO TABLE t PARTITION (b, c) SELECT 1, 2, 3".
+      //
+      // Basically, we will put those partition columns having a assigned value back
+      // to the SELECT clause. The output of the SELECT clause is organized as
+      // normal_columns static_partitioning_columns dynamic_partitioning_columns.
+      // static_partitioning_columns are partitioning columns having assigned
+      // values in the PARTITION clause (e.g. b in the above example).
+      // dynamic_partitioning_columns are partitioning columns that do not assigned
+      // values in the PARTITION clause (e.g. c in the above example).
+      val actualQuery = if (parts.exists(_._2.isDefined)) {
+        val projectList = convertStaticPartitions(
+          sourceAttributes = query.output,
+          providedPartitions = parts,
+          targetAttributes = l.output,
+          targetPartitionSchema = t.partitionSchema)
+        Project(projectList, query)
+      } else {
+        query
+      }
+
+      // Sanity check
       if (t.location.rootPaths.size != 1) {
-        throw new AnalysisException(
-          "Can only write data to relations with a single path.")
+        throw new AnalysisException("Can only write data to relations with a single path.")
       }
 
       val outputPath = t.location.rootPaths.head
-      val inputPaths = query.collect {
+      val inputPaths = actualQuery.collect {
         case LogicalRelation(r: HadoopFsRelation, _, _) => r.location.rootPaths
       }.flatten
 
-      val mode = if (overwrite.enabled) SaveMode.Overwrite else SaveMode.Append
-      if (overwrite.enabled && inputPaths.contains(outputPath)) {
+      val mode = if (overwrite) SaveMode.Overwrite else SaveMode.Append
+      if (overwrite && inputPaths.contains(outputPath)) {
         throw new AnalysisException(
           "Cannot overwrite a path that is also being read from.")
       }
 
-      val partitionSchema = query.resolve(
+      val partitionSchema = actualQuery.resolve(
         t.partitionSchema, t.sparkSession.sessionState.analyzer.resolver)
       val partitionsTrackedByCatalog =
         t.sparkSession.sessionState.conf.manageFilesourcePartitions &&
@@ -192,19 +205,13 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
       var initialMatchingPartitions: Seq[TablePartitionSpec] = Nil
       var customPartitionLocations: Map[TablePartitionSpec, String] = Map.empty
 
-      val staticPartitionKeys: TablePartitionSpec = if (overwrite.enabled) {
-        overwrite.staticPartitionKeys.map { case (k, v) =>
-          (partitionSchema.map(_.name).find(_.equalsIgnoreCase(k)).get, v)
-        }
-      } else {
-        Map.empty
-      }
+      val staticPartitions = parts.filter(_._2.nonEmpty).map { case (k, v) => k -> v.get }
 
       // When partitions are tracked by the catalog, compute all custom partition locations that
       // may be relevant to the insertion job.
       if (partitionsTrackedByCatalog) {
         val matchingPartitions = t.sparkSession.sessionState.catalog.listPartitions(
-          l.catalogTable.get.identifier, Some(staticPartitionKeys))
+          l.catalogTable.get.identifier, Some(staticPartitions))
         initialMatchingPartitions = matchingPartitions.map(_.spec)
         customPartitionLocations = getCustomPartitionLocations(
           t.sparkSession, l.catalogTable.get, outputPath, matchingPartitions)
@@ -220,7 +227,7 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
               l.catalogTable.get.identifier, newPartitions.toSeq.map(p => (p, None)),
               ifNotExists = true).run(t.sparkSession)
           }
-          if (overwrite.enabled) {
+          if (overwrite) {
             val deletedPartitions = initialMatchingPartitions.toSet -- updatedPartitions
             if (deletedPartitions.nonEmpty) {
               AlterTableDropPartitionCommand(
@@ -235,14 +242,14 @@ case class DataSourceAnalysis(conf: CatalystConf) extends Rule[LogicalPlan] {
 
       val insertCmd = InsertIntoHadoopFsRelationCommand(
         outputPath,
-        staticPartitionKeys,
+        staticPartitions,
         customPartitionLocations,
         partitionSchema,
         t.bucketSpec,
         t.fileFormat,
         refreshPartitionsCallback,
         t.options,
-        query,
+        actualQuery,
         mode,
         table)
 
@@ -305,7 +312,7 @@ class FindDataSourceTable(sparkSession: SparkSession) extends Rule[LogicalPlan]
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-    case i @ logical.InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
         if DDLUtils.isDatasourceTable(s.metadata) =>
       i.copy(table = readDataSourceTable(sparkSession, s))
 
@@ -351,7 +358,7 @@ object DataSourceStrategy extends Strategy with Logging {
         Map.empty,
         None) :: Nil
 
-    case i @ logical.InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
+    case InsertIntoTable(l @ LogicalRelation(t: InsertableRelation, _, _),
       part, query, overwrite, false) if part.isEmpty =>
       ExecutedCommandExec(InsertIntoDataSourceCommand(l, query, overwrite)) :: Nil
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index 2eba1e9..b2ff68a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.QueryPlan
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.sources.InsertableRelation
 
@@ -30,7 +30,7 @@ import org.apache.spark.sql.sources.InsertableRelation
 case class InsertIntoDataSourceCommand(
     logicalRelation: LogicalRelation,
     query: LogicalPlan,
-    overwrite: OverwriteOptions)
+    overwrite: Boolean)
   extends RunnableCommand {
 
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
@@ -40,7 +40,7 @@ case class InsertIntoDataSourceCommand(
     val data = Dataset.ofRows(sparkSession, query)
     // Apply the schema of the existing table to the new data.
     val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, logicalRelation.schema)
-    relation.insert(df, overwrite.enabled)
+    relation.insert(df, overwrite)
 
     // Invalidate the cache.
     sparkSession.sharedState.cacheManager.invalidateCache(logicalRelation)

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
index a9bde90..53c884c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoHadoopFsRelationCommand.scala
@@ -33,10 +33,10 @@ import org.apache.spark.sql.execution.command.RunnableCommand
  * A command for writing data to a [[HadoopFsRelation]].  Supports both overwriting and appending.
  * Writing to dynamic partitions is also supported.
  *
- * @param staticPartitionKeys partial partitioning spec for write. This defines the scope of
- *                            partition overwrites: when the spec is empty, all partitions are
- *                            overwritten. When it covers a prefix of the partition keys, only
- *                            partitions matching the prefix are overwritten.
+ * @param staticPartitions partial partitioning spec for write. This defines the scope of partition
+ *                         overwrites: when the spec is empty, all partitions are overwritten.
+ *                         When it covers a prefix of the partition keys, only partitions matching
+ *                         the prefix are overwritten.
  * @param customPartitionLocations mapping of partition specs to their custom locations. The
  *                                 caller should guarantee that exactly those table partitions
  *                                 falling under the specified static partition keys are contained
@@ -44,7 +44,7 @@ import org.apache.spark.sql.execution.command.RunnableCommand
  */
 case class InsertIntoHadoopFsRelationCommand(
     outputPath: Path,
-    staticPartitionKeys: TablePartitionSpec,
+    staticPartitions: TablePartitionSpec,
     customPartitionLocations: Map[TablePartitionSpec, String],
     partitionColumns: Seq[Attribute],
     bucketSpec: Option[BucketSpec],
@@ -122,9 +122,9 @@ case class InsertIntoHadoopFsRelationCommand(
    * locations are also cleared based on the custom locations map given to this class.
    */
   private def deleteMatchingPartitions(fs: FileSystem, qualifiedOutputPath: Path): Unit = {
-    val staticPartitionPrefix = if (staticPartitionKeys.nonEmpty) {
+    val staticPartitionPrefix = if (staticPartitions.nonEmpty) {
       "/" + partitionColumns.flatMap { p =>
-        staticPartitionKeys.get(p.name) match {
+        staticPartitions.get(p.name) match {
           case Some(value) =>
             Some(escapePathName(p.name) + "=" + escapePathName(value))
           case None =>
@@ -143,7 +143,7 @@ case class InsertIntoHadoopFsRelationCommand(
     // now clear all custom partition locations (e.g. /custom/dir/where/foo=2/bar=4)
     for ((spec, customLoc) <- customPartitionLocations) {
       assert(
-        (staticPartitionKeys.toSet -- spec).isEmpty,
+        (staticPartitions.toSet -- spec).isEmpty,
         "Custom partition location did not match static partitioning keys")
       val path = new Path(customLoc)
       if (fs.exists(path) && !fs.delete(path, true)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index ce1e3eb..773c4a3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -47,7 +47,7 @@ private[hive] trait HiveStrategies {
       case logical.InsertIntoTable(
           table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
         InsertIntoHiveTable(
-          table, partition, planLater(child), overwrite.enabled, ifNotExists) :: Nil
+          table, partition, planLater(child), overwrite, ifNotExists) :: Nil
 
       case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
         val newTableDesc = if (tableDesc.storage.serde.isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/3e307b49/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index cac4359..ef5a5a0 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, OverwriteOptions}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -88,8 +88,7 @@ case class CreateHiveTableAsSelectCommand(
     } else {
       try {
         sparkSession.sessionState.executePlan(InsertIntoTable(
-          metastoreRelation, Map(), query, overwrite = OverwriteOptions(true),
-          ifNotExists = false)).toRdd
+          metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
       } catch {
         case NonFatal(e) =>
           // drop the created table.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org