You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/12/29 05:50:25 UTC

spark git commit: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelectCommand

Repository: spark
Updated Branches:
  refs/heads/master 93f35569f -> 7d19b6ab7


[SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelectCommand

## What changes were proposed in this pull request?

The `CreateDataSourceTableAsSelectCommand` is quite complex now, as it has a lot of work to do if the table already exists:

1. throw exception if we don't want to ignore it.
2. do some check and adjust the schema if we want to append data.
3. drop the table and create it again if we want to overwrite.

The work 2 and 3 should be done by analyzer, so that we can also apply it to hive tables.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #15996 from cloud-fan/append.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7d19b6ab
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7d19b6ab
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7d19b6ab

Branch: refs/heads/master
Commit: 7d19b6ab7d75b95d9eb1c7e1f228d23fd482306e
Parents: 93f3556
Author: Wenchen Fan <we...@databricks.com>
Authored: Wed Dec 28 21:50:21 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Dec 28 21:50:21 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/sql/DataFrameWriter.scala  |  78 +++++----
 .../command/createDataSourceTables.scala        | 167 +++++--------------
 .../spark/sql/execution/datasources/rules.scala | 164 +++++++++++++-----
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   2 +-
 4 files changed, 213 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7d19b6ab/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9c5660a..405f38a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,11 +23,12 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.annotation.InterfaceStability
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, UnresolvedRelation}
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
 import org.apache.spark.sql.execution.command.DDLUtils
-import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, LogicalRelation}
+import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.sql.types.StructType
 
 /**
@@ -364,7 +365,11 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       throw new AnalysisException("Cannot create hive serde table with saveAsTable API")
     }
 
-    val tableExists = df.sparkSession.sessionState.catalog.tableExists(tableIdent)
+    val catalog = df.sparkSession.sessionState.catalog
+    val tableExists = catalog.tableExists(tableIdent)
+    val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+    val tableIdentWithDB = tableIdent.copy(database = Some(db))
+    val tableName = tableIdentWithDB.unquotedString
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
@@ -373,39 +378,48 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
       case (true, SaveMode.ErrorIfExists) =>
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
-      case _ =>
-        val existingTable = if (tableExists) {
-          Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
-        } else {
-          None
+      case (true, SaveMode.Overwrite) =>
+        // Get all input data source relations of the query.
+        val srcRelations = df.logicalPlan.collect {
+          case LogicalRelation(src: BaseRelation, _, _) => src
         }
-        val storage = if (tableExists) {
-          existingTable.get.storage
-        } else {
-          DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-        }
-        val tableType = if (tableExists) {
-          existingTable.get.tableType
-        } else if (storage.locationUri.isDefined) {
-          CatalogTableType.EXTERNAL
-        } else {
-          CatalogTableType.MANAGED
+        EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
+          // Only do the check if the table is a data source table (the relation is a BaseRelation).
+          case LogicalRelation(dest: BaseRelation, _, _) if srcRelations.contains(dest) =>
+            throw new AnalysisException(
+              s"Cannot overwrite table $tableName that is also being read from")
+          case _ => // OK
         }
 
-        val tableDesc = CatalogTable(
-          identifier = tableIdent,
-          tableType = tableType,
-          storage = storage,
-          schema = new StructType,
-          provider = Some(source),
-          partitionColumnNames = partitioningColumns.getOrElse(Nil),
-          bucketSpec = getBucketSpec
-        )
-        df.sparkSession.sessionState.executePlan(
-          CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
+        // Drop the existing table
+        catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
+        createTable(tableIdent)
+
+      case _ => createTable(tableIdent)
     }
   }
 
+  private def createTable(tableIdent: TableIdentifier): Unit = {
+    val storage = DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
+    val tableType = if (storage.locationUri.isDefined) {
+      CatalogTableType.EXTERNAL
+    } else {
+      CatalogTableType.MANAGED
+    }
+
+    val tableDesc = CatalogTable(
+      identifier = tableIdent,
+      tableType = tableType,
+      storage = storage,
+      schema = new StructType,
+      provider = Some(source),
+      partitionColumnNames = partitioningColumns.getOrElse(Nil),
+      bucketSpec = getBucketSpec
+    )
+    df.sparkSession.sessionState.executePlan(
+      CreateTable(tableDesc, mode, Some(df.logicalPlan))).toRdd
+  }
+
   /**
    * Saves the content of the `DataFrame` to an external database table via JDBC. In the case the
    * table already exists in the external database, behavior of this function depends on the
@@ -441,7 +455,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
     assertNotPartitioned("jdbc")
     assertNotBucketed("jdbc")
     // connectionProperties should override settings in extraOptions.
-    this.extraOptions = this.extraOptions ++ connectionProperties.asScala
+    this.extraOptions ++= connectionProperties.asScala
     // explicit url and dbtable should override all
     this.extraOptions += ("url" -> url, "dbtable" -> table)
     format("jdbc").save()
@@ -588,7 +602,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
 
   private var mode: SaveMode = SaveMode.ErrorIfExists
 
-  private var extraOptions = new scala.collection.mutable.HashMap[String, String]
+  private val extraOptions = new scala.collection.mutable.HashMap[String, String]
 
   private var partitioningColumns: Option[Seq[String]] = None
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7d19b6ab/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 81c2047..c64c7ad 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.NamedExpression
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.sources.BaseRelation
@@ -134,139 +133,31 @@ case class CreateDataSourceTableAsSelectCommand(
     assert(table.provider.isDefined)
     assert(table.schema.isEmpty)
 
-    val provider = table.provider.get
     val sessionState = sparkSession.sessionState
     val db = table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = table.identifier.copy(database = Some(db))
     val tableName = tableIdentWithDB.unquotedString
 
-    var createMetastoreTable = false
-    // We may need to reorder the columns of the query to match the existing table.
-    var reorderedColumns = Option.empty[Seq[NamedExpression]]
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-      // Check if we need to throw an exception or just return.
-      mode match {
-        case SaveMode.ErrorIfExists =>
-          throw new AnalysisException(s"Table $tableName already exists. " +
-            s"If you are using saveAsTable, you can set SaveMode to SaveMode.Append to " +
-            s"insert data into the table or set SaveMode to SaveMode.Overwrite to overwrite" +
-            s"the existing data. " +
-            s"Or, if you are using SQL CREATE TABLE, you need to drop $tableName first.")
-        case SaveMode.Ignore =>
-          // Since the table already exists and the save mode is Ignore, we will just return.
-          return Seq.empty[Row]
-        case SaveMode.Append =>
-          val existingTable = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+    val result = if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+      assert(mode != SaveMode.Overwrite,
+        s"Expect the table $tableName has been dropped when the save mode is Overwrite")
 
-          if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
-            throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
-              "not supported yet. Please use the insertInto() API as an alternative.")
-          }
-
-          // Check if the specified data source match the data source of the existing table.
-          val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
-          val specifiedProvider = DataSource.lookupDataSource(table.provider.get)
-          // TODO: Check that options from the resolved relation match the relation that we are
-          // inserting into (i.e. using the same compression).
-          if (existingProvider != specifiedProvider) {
-            throw new AnalysisException(s"The format of the existing table $tableName is " +
-              s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
-              s"`${specifiedProvider.getSimpleName}`.")
-          }
-
-          if (query.schema.length != existingTable.schema.length) {
-            throw new AnalysisException(
-              s"The column number of the existing table $tableName" +
-                s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
-                s"(${query.schema.catalogString})")
-          }
-
-          val resolver = sessionState.conf.resolver
-          val tableCols = existingTable.schema.map(_.name)
-
-          reorderedColumns = Some(existingTable.schema.map { f =>
-            query.resolve(Seq(f.name), resolver).getOrElse {
-              val inputColumns = query.schema.map(_.name).mkString(", ")
-              throw new AnalysisException(
-                s"cannot resolve '${f.name}' given input columns: [$inputColumns]")
-            }
-          })
-
-          // In `AnalyzeCreateTable`, we verified the consistency between the user-specified table
-          // definition(partition columns, bucketing) and the SELECT query, here we also need to
-          // verify the the consistency between the user-specified table definition and the existing
-          // table definition.
-
-          // Check if the specified partition columns match the existing table.
-          val specifiedPartCols = CatalogUtils.normalizePartCols(
-            tableName, tableCols, table.partitionColumnNames, resolver)
-          if (specifiedPartCols != existingTable.partitionColumnNames) {
-            throw new AnalysisException(
-              s"""
-                |Specified partitioning does not match that of the existing table $tableName.
-                |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
-                |Existing partition columns: [${existingTable.partitionColumnNames.mkString(", ")}]
-              """.stripMargin)
-          }
-
-          // Check if the specified bucketing match the existing table.
-          val specifiedBucketSpec = table.bucketSpec.map { bucketSpec =>
-            CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
-          }
-          if (specifiedBucketSpec != existingTable.bucketSpec) {
-            val specifiedBucketString =
-              specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
-            val existingBucketString =
-              existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
-            throw new AnalysisException(
-              s"""
-                |Specified bucketing does not match that of the existing table $tableName.
-                |Specified bucketing: $specifiedBucketString
-                |Existing bucketing: $existingBucketString
-              """.stripMargin)
-          }
-
-        case SaveMode.Overwrite =>
-          sessionState.catalog.dropTable(tableIdentWithDB, ignoreIfNotExists = true, purge = false)
-          // Need to create the table again.
-          createMetastoreTable = true
+      if (mode == SaveMode.ErrorIfExists) {
+        throw new AnalysisException(s"Table $tableName already exists. You need to drop it first.")
+      }
+      if (mode == SaveMode.Ignore) {
+        // Since the table already exists and the save mode is Ignore, we will just return.
+        return Seq.empty
       }
-    } else {
-      // The table does not exist. We need to create it in metastore.
-      createMetastoreTable = true
-    }
-
-    val data = Dataset.ofRows(sparkSession, query)
-    val df = reorderedColumns match {
-      // Reorder the columns of the query to match the existing table.
-      case Some(cols) => data.select(cols.map(Column(_)): _*)
-      case None => data
-    }
 
-    val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
-      Some(sessionState.catalog.defaultTablePath(table.identifier))
+      saveDataIntoTable(sparkSession, table, table.storage.locationUri, query, mode)
     } else {
-      table.storage.locationUri
-    }
-
-    // Create the relation based on the data of df.
-    val pathOption = tableLocation.map("path" -> _)
-    val dataSource = DataSource(
-      sparkSession,
-      className = provider,
-      partitionColumns = table.partitionColumnNames,
-      bucketSpec = table.bucketSpec,
-      options = table.storage.properties ++ pathOption,
-      catalogTable = Some(table))
-
-    val result = try {
-      dataSource.write(mode, df)
-    } catch {
-      case ex: AnalysisException =>
-        logError(s"Failed to write to table $tableName in $mode mode", ex)
-        throw ex
-    }
-    if (createMetastoreTable) {
+      val tableLocation = if (table.tableType == CatalogTableType.MANAGED) {
+        Some(sessionState.catalog.defaultTablePath(table.identifier))
+      } else {
+        table.storage.locationUri
+      }
+      val result = saveDataIntoTable(sparkSession, table, tableLocation, query, mode)
       val newTable = table.copy(
         storage = table.storage.copy(locationUri = tableLocation),
         // We will use the schema of resolved.relation as the schema of the table (instead of
@@ -274,6 +165,7 @@ case class CreateDataSourceTableAsSelectCommand(
         // provider (for example, see org.apache.spark.sql.parquet.DefaultSource).
         schema = result.schema)
       sessionState.catalog.createTable(newTable, ignoreIfExists = false)
+      result
     }
 
     result match {
@@ -289,4 +181,29 @@ case class CreateDataSourceTableAsSelectCommand(
     sessionState.catalog.refreshTable(tableIdentWithDB)
     Seq.empty[Row]
   }
+
+  private def saveDataIntoTable(
+      session: SparkSession,
+      table: CatalogTable,
+      tableLocation: Option[String],
+      data: LogicalPlan,
+      mode: SaveMode): BaseRelation = {
+    // Create the relation based on the input logical plan: `data`.
+    val pathOption = tableLocation.map("path" -> _)
+    val dataSource = DataSource(
+      session,
+      className = table.provider.get,
+      partitionColumns = table.partitionColumnNames,
+      bucketSpec = table.bucketSpec,
+      options = table.storage.properties ++ pathOption,
+      catalogTable = Some(table))
+
+    try {
+      dataSource.write(mode, Dataset.ofRows(session, query))
+    } catch {
+      case ex: AnalysisException =>
+        logError(s"Failed to write to table ${table.identifier.unquotedString}", ex)
+        throw ex
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/7d19b6ab/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 2b2fbdd..07b1667 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -21,7 +21,7 @@ import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable, CatalogUtils, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -86,6 +86,108 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       }
       c
 
+    // When we append data to an existing table, check if the given provider, partition columns,
+    // bucket spec, etc. match the existing table, and adjust the columns order of the given query
+    // if necessary.
+    case c @ CreateTable(tableDesc, SaveMode.Append, Some(query))
+        if sparkSession.sessionState.catalog.tableExists(tableDesc.identifier) =>
+      // This is guaranteed by the parser and `DataFrameWriter`
+      assert(tableDesc.schema.isEmpty && tableDesc.provider.isDefined)
+
+      // Analyze the query in CTAS and then we can do the normalization and checking.
+      val qe = sparkSession.sessionState.executePlan(query)
+      qe.assertAnalyzed()
+      val analyzedQuery = qe.analyzed
+
+      val catalog = sparkSession.sessionState.catalog
+      val db = tableDesc.identifier.database.getOrElse(catalog.getCurrentDatabase)
+      val tableIdentWithDB = tableDesc.identifier.copy(database = Some(db))
+      val tableName = tableIdentWithDB.unquotedString
+      val existingTable = catalog.getTableMetadata(tableIdentWithDB)
+
+      if (existingTable.tableType == CatalogTableType.VIEW) {
+        throw new AnalysisException("Saving data into a view is not allowed.")
+      }
+
+      if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+        throw new AnalysisException(s"Saving data in the Hive serde table $tableName is " +
+          "not supported yet. Please use the insertInto() API as an alternative.")
+      }
+
+      // Check if the specified data source match the data source of the existing table.
+      val existingProvider = DataSource.lookupDataSource(existingTable.provider.get)
+      val specifiedProvider = DataSource.lookupDataSource(tableDesc.provider.get)
+      // TODO: Check that options from the resolved relation match the relation that we are
+      // inserting into (i.e. using the same compression).
+      if (existingProvider != specifiedProvider) {
+        throw new AnalysisException(s"The format of the existing table $tableName is " +
+          s"`${existingProvider.getSimpleName}`. It doesn't match the specified format " +
+          s"`${specifiedProvider.getSimpleName}`.")
+      }
+
+      if (analyzedQuery.schema.length != existingTable.schema.length) {
+        throw new AnalysisException(
+          s"The column number of the existing table $tableName" +
+            s"(${existingTable.schema.catalogString}) doesn't match the data schema" +
+            s"(${query.schema.catalogString})")
+      }
+
+      val resolver = sparkSession.sessionState.conf.resolver
+      val tableCols = existingTable.schema.map(_.name)
+
+      // As we are inserting into an existing table, we should respect the existing schema and
+      // adjust the column order of the given dataframe according to it, or throw exception
+      // if the column names do not match.
+      val adjustedColumns = tableCols.map { col =>
+        analyzedQuery.resolve(Seq(col), resolver).getOrElse {
+          val inputColumns = analyzedQuery.schema.map(_.name).mkString(", ")
+          throw new AnalysisException(
+            s"cannot resolve '$col' given input columns: [$inputColumns]")
+        }
+      }
+
+      // Check if the specified partition columns match the existing table.
+      val specifiedPartCols = CatalogUtils.normalizePartCols(
+        tableName, tableCols, tableDesc.partitionColumnNames, resolver)
+      if (specifiedPartCols != existingTable.partitionColumnNames) {
+        val existingPartCols = existingTable.partitionColumnNames.mkString(", ")
+        throw new AnalysisException(
+          s"""
+             |Specified partitioning does not match that of the existing table $tableName.
+             |Specified partition columns: [${specifiedPartCols.mkString(", ")}]
+             |Existing partition columns: [$existingPartCols]
+          """.stripMargin)
+      }
+
+      // Check if the specified bucketing match the existing table.
+      val specifiedBucketSpec = tableDesc.bucketSpec.map { bucketSpec =>
+        CatalogUtils.normalizeBucketSpec(tableName, tableCols, bucketSpec, resolver)
+      }
+      if (specifiedBucketSpec != existingTable.bucketSpec) {
+        val specifiedBucketString =
+          specifiedBucketSpec.map(_.toString).getOrElse("not bucketed")
+        val existingBucketString =
+          existingTable.bucketSpec.map(_.toString).getOrElse("not bucketed")
+        throw new AnalysisException(
+          s"""
+             |Specified bucketing does not match that of the existing table $tableName.
+             |Specified bucketing: $specifiedBucketString
+             |Existing bucketing: $existingBucketString
+          """.stripMargin)
+      }
+
+      val newQuery = if (adjustedColumns != analyzedQuery.output) {
+        Project(adjustedColumns, analyzedQuery)
+      } else {
+        analyzedQuery
+      }
+
+      c.copy(
+        // trust everything from the existing table, except schema as we assume it's empty in a lot
+        // of places, when we do CTAS.
+        tableDesc = existingTable.copy(schema = new StructType()),
+        query = Some(newQuery))
+
     // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
     // config, and do various checks:
     //   * column names in table definition can't be duplicated.
@@ -94,7 +196,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
     //   * can't use all table columns as partition columns.
     //   * partition columns' type must be AtomicType.
     //   * sort columns' type must be orderable.
-    case c @ CreateTable(tableDesc, mode, query) =>
+    case c @ CreateTable(tableDesc, _, query) =>
       val analyzedQuery = query.map { q =>
         // Analyze the query in CTAS and then we can do the normalization and checking.
         val qe = sparkSession.sessionState.executePlan(q)
@@ -106,6 +208,7 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       } else {
         tableDesc.schema
       }
+
       val columnNames = if (sparkSession.sessionState.conf.caseSensitiveAnalysis) {
         schema.map(_.name)
       } else {
@@ -113,22 +216,24 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       }
       checkDuplication(columnNames, "table definition of " + tableDesc.identifier)
 
-      val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
-      val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
-      c.copy(tableDesc = bucketColsChecked, query = analyzedQuery)
+      val normalizedTable = tableDesc.copy(
+        partitionColumnNames = normalizePartitionColumns(schema, tableDesc),
+        bucketSpec = normalizeBucketSpec(schema, tableDesc))
+
+      c.copy(tableDesc = normalizedTable, query = analyzedQuery)
   }
 
-  private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+  private def normalizePartitionColumns(schema: StructType, table: CatalogTable): Seq[String] = {
     val normalizedPartitionCols = CatalogUtils.normalizePartCols(
-      tableName = tableDesc.identifier.unquotedString,
+      tableName = table.identifier.unquotedString,
       tableCols = schema.map(_.name),
-      partCols = tableDesc.partitionColumnNames,
+      partCols = table.partitionColumnNames,
       resolver = sparkSession.sessionState.conf.resolver)
 
     checkDuplication(normalizedPartitionCols, "partition")
 
     if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
-      if (tableDesc.provider.get == DDLUtils.HIVE_PROVIDER) {
+      if (table.provider.get == DDLUtils.HIVE_PROVIDER) {
         // When we hit this branch, it means users didn't specify schema for the table to be
         // created, as we always include partition columns in table schema for hive serde tables.
         // The real schema will be inferred at hive metastore by hive serde, plus the given
@@ -144,28 +249,28 @@ case class AnalyzeCreateTable(sparkSession: SparkSession) extends Rule[LogicalPl
       case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column")
     }
 
-    tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+    normalizedPartitionCols
   }
 
-  private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
-    tableDesc.bucketSpec match {
+  private def normalizeBucketSpec(schema: StructType, table: CatalogTable): Option[BucketSpec] = {
+    table.bucketSpec match {
       case Some(bucketSpec) =>
-        val normalizedBucketing = CatalogUtils.normalizeBucketSpec(
-          tableName = tableDesc.identifier.unquotedString,
+        val normalizedBucketSpec = CatalogUtils.normalizeBucketSpec(
+          tableName = table.identifier.unquotedString,
           tableCols = schema.map(_.name),
           bucketSpec = bucketSpec,
           resolver = sparkSession.sessionState.conf.resolver)
-        checkDuplication(normalizedBucketing.bucketColumnNames, "bucket")
-        checkDuplication(normalizedBucketing.sortColumnNames, "sort")
+        checkDuplication(normalizedBucketSpec.bucketColumnNames, "bucket")
+        checkDuplication(normalizedBucketSpec.sortColumnNames, "sort")
 
-        normalizedBucketing.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
+        normalizedBucketSpec.sortColumnNames.map(schema(_)).map(_.dataType).foreach {
           case dt if RowOrdering.isOrderable(dt) => // OK
           case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column")
         }
 
-        tableDesc.copy(bucketSpec = Some(normalizedBucketing))
+        Some(normalizedBucketSpec)
 
-      case None => tableDesc
+      case None => None
     }
   }
 
@@ -294,27 +399,6 @@ case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
 
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
-      case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
-        if (query.isDefined &&
-          mode == SaveMode.Overwrite &&
-          catalog.tableExists(tableDesc.identifier)) {
-          // Need to remove SubQuery operator.
-          EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
-            // Only do the check if the table is a data source table
-            // (the relation is a BaseRelation).
-            case LogicalRelation(dest: BaseRelation, _, _) =>
-              // Get all input data source relations of the query.
-              val srcRelations = query.get.collect {
-                case LogicalRelation(src: BaseRelation, _, _) => src
-              }
-              if (srcRelations.contains(dest)) {
-                failAnalysis(
-                  s"Cannot overwrite table ${tableDesc.identifier} that is also being read from")
-              }
-            case _ => // OK
-          }
-        }
-
       case logical.InsertIntoTable(
           l @ LogicalRelation(t: InsertableRelation, _, _), partition, query, _, _) =>
         // Right now, we do not support insert into a data source table with partition specs.

http://git-wip-us.apache.org/repos/asf/spark/blob/7d19b6ab/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index deb40f0..0f787be 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1239,7 +1239,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv
       var e = intercept[AnalysisException] {
         table(tableName).write.mode(SaveMode.Overwrite).saveAsTable(tableName)
       }.getMessage
-      assert(e.contains(s"Cannot overwrite table `$tableName` that is also being read from"))
+      assert(e.contains(s"Cannot overwrite table default.$tableName that is also being read from"))
 
       e = intercept[AnalysisException] {
         table(tableName).write.mode(SaveMode.ErrorIfExists).saveAsTable(tableName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org