You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/08/05 08:50:33 UTC

spark git commit: [SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS

Repository: spark
Updated Branches:
  refs/heads/master faaefab26 -> 5effc016c


[SPARK-16879][SQL] unify logical plans for CREATE TABLE and CTAS

## What changes were proposed in this pull request?

we have various logical plans for CREATE TABLE and CTAS: `CreateTableUsing`, `CreateTableUsingAsSelect`, `CreateHiveTableAsSelectLogicalPlan`. This PR unifies them to reduce the complexity and centralize the error handling.

## How was this patch tested?

existing tests

Author: Wenchen Fan <we...@databricks.com>

Closes #14482 from cloud-fan/table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5effc016
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5effc016
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5effc016

Branch: refs/heads/master
Commit: 5effc016c893ce917d535cc1b5026d8e4c846721
Parents: faaefab
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Aug 5 10:50:26 2016 +0200
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Fri Aug 5 10:50:26 2016 +0200

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |  17 +-
 .../catalyst/catalog/ExternalCatalogSuite.scala |   8 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  24 +--
 .../scala/org/apache/spark/sql/Dataset.scala    |   8 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 100 +++++------
 .../spark/sql/execution/SparkStrategies.scala   |  59 +++----
 .../command/createDataSourceTables.scala        |  64 +------
 .../spark/sql/execution/datasources/ddl.scala   |  49 ++----
 .../spark/sql/execution/datasources/rules.scala | 170 ++++++++++++++++---
 .../apache/spark/sql/internal/CatalogImpl.scala |  46 ++---
 .../spark/sql/internal/SessionState.scala       |   3 +-
 .../sql/execution/command/DDLCommandSuite.scala | 151 +++++++---------
 .../spark/sql/execution/command/DDLSuite.scala  |  47 ++++-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  29 ++--
 .../spark/sql/hive/HiveSessionState.scala       |   1 +
 .../spark/sql/hive/HiveDDLCommandSuite.scala    |   6 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |   7 +
 17 files changed, 417 insertions(+), 372 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 38f0bc2..f7762e0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,8 +21,7 @@ import java.util.Date
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
-import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.types.StructType
@@ -112,6 +111,8 @@ case class BucketSpec(
  * Note that Hive's metastore also tracks skewed columns. We should consider adding that in the
  * future once we have a better understanding of how we want to handle skewed columns.
  *
+ * @param provider the name of the data source provider for this table, e.g. parquet, json, etc.
+ *                 Can be None if this table is a View, should be "hive" for hive serde tables.
  * @param unsupportedFeatures is a list of string descriptions of features that are used by the
  *        underlying table but not supported by Spark SQL yet.
  */
@@ -120,6 +121,7 @@ case class CatalogTable(
     tableType: CatalogTableType,
     storage: CatalogStorageFormat,
     schema: StructType,
+    provider: Option[String] = None,
     partitionColumnNames: Seq[String] = Seq.empty,
     bucketSpec: Option[BucketSpec] = None,
     owner: String = "",
@@ -131,16 +133,6 @@ case class CatalogTable(
     comment: Option[String] = None,
     unsupportedFeatures: Seq[String] = Seq.empty) {
 
-  // Verify that the provided columns are part of the schema
-  private val colNames = schema.map(_.name).toSet
-  private def requireSubsetOfSchema(cols: Seq[String], colType: String): Unit = {
-    require(cols.toSet.subsetOf(colNames), s"$colType columns (${cols.mkString(", ")}) " +
-      s"must be a subset of schema (${colNames.mkString(", ")}) in table '$identifier'")
-  }
-  requireSubsetOfSchema(partitionColumnNames, "partition")
-  requireSubsetOfSchema(bucketSpec.map(_.sortColumnNames).getOrElse(Nil), "sort")
-  requireSubsetOfSchema(bucketSpec.map(_.bucketColumnNames).getOrElse(Nil), "bucket")
-
   /** schema of this table's partition columns */
   def partitionSchema: StructType = StructType(schema.filter {
     c => partitionColumnNames.contains(c.name)
@@ -189,6 +181,7 @@ case class CatalogTable(
         s"Last Access: ${new Date(lastAccessTime).toString}",
         s"Type: ${tableType.name}",
         if (schema.nonEmpty) s"Schema: ${schema.mkString("[", ", ", "]")}" else "",
+        if (provider.isDefined) s"Provider: ${provider.get}" else "",
         if (partitionColumnNames.nonEmpty) s"Partition Columns: $partitionColumns" else ""
       ) ++ bucketStrings ++ Seq(
         viewOriginalText.map("Original View: " + _).getOrElse(""),

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index 201d39a..54365fd 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -552,7 +552,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
       identifier = TableIdentifier("my_table", Some("db1")),
       tableType = CatalogTableType.MANAGED,
       storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
-      schema = new StructType().add("a", "int").add("b", "string")
+      schema = new StructType().add("a", "int").add("b", "string"),
+      provider = Some("hive")
     )
 
     catalog.createTable(table, ignoreIfExists = false)
@@ -571,7 +572,8 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
       storage = CatalogStorageFormat(
         Some(Utils.createTempDir().getAbsolutePath),
         None, None, None, false, Map.empty),
-      schema = new StructType().add("a", "int").add("b", "string")
+      schema = new StructType().add("a", "int").add("b", "string"),
+      provider = Some("hive")
     )
     catalog.createTable(externalTable, ignoreIfExists = false)
     assert(!exists(db.locationUri, "external_table"))
@@ -589,6 +591,7 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac
         .add("col2", "string")
         .add("a", "int")
         .add("b", "string"),
+      provider = Some("hive"),
       partitionColumnNames = Seq("a", "b")
     )
     catalog.createTable(table, ignoreIfExists = false)
@@ -692,6 +695,7 @@ abstract class CatalogTestUtils {
         .add("col2", "string")
         .add("a", "int")
         .add("b", "string"),
+      provider = Some("hive"),
       partitionColumnNames = Seq("a", "b"),
       bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 4418988..6dbed26 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -23,10 +23,11 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.plans.logical.InsertIntoTable
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, DataSource, HadoopFsRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, DataSource, HadoopFsRelation}
 import org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils
+import org.apache.spark.sql.types.StructType
 
 /**
  * Interface used to write a [[Dataset]] to external storage systems (e.g. file systems,
@@ -367,15 +368,16 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) {
         throw new AnalysisException(s"Table $tableIdent already exists.")
 
       case _ =>
-        val cmd =
-          CreateTableUsingAsSelect(
-            tableIdent,
-            source,
-            partitioningColumns.map(_.toArray).getOrElse(Array.empty[String]),
-            getBucketSpec,
-            mode,
-            extraOptions.toMap,
-            df.logicalPlan)
+        val tableDesc = CatalogTable(
+          identifier = tableIdent,
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(properties = extraOptions.toMap),
+          schema = new StructType,
+          provider = Some(source),
+          partitionColumnNames = partitioningColumns.getOrElse(Nil),
+          bucketSpec = getBucketSpec
+        )
+        val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
         df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index 263ee33..9eef5cc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -24,7 +24,6 @@ import scala.language.implicitConversions
 import scala.reflect.runtime.universe.TypeTag
 import scala.util.control.NonFatal
 
-import com.fasterxml.jackson.core.JsonFactory
 import org.apache.commons.lang3.StringUtils
 
 import org.apache.spark.annotation.{DeveloperApi, Experimental}
@@ -35,18 +34,16 @@ import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.encoders._
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
-import org.apache.spark.sql.catalyst.expressions.objects.Invoke
 import org.apache.spark.sql.catalyst.optimizer.CombineUnions
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.usePrettyExpression
 import org.apache.spark.sql.execution.{FileRelation, LogicalRDD, QueryExecution, SQLExecution}
 import org.apache.spark.sql.execution.command.{CreateViewCommand, ExplainCommand}
-import org.apache.spark.sql.execution.datasources.{CreateTableUsingAsSelect, LogicalRelation}
+import org.apache.spark.sql.execution.datasources.{CreateTable, LogicalRelation}
 import org.apache.spark.sql.execution.datasources.json.JacksonGenerator
 import org.apache.spark.sql.execution.python.EvaluatePython
 import org.apache.spark.sql.streaming.{DataStreamWriter, StreamingQuery}
@@ -174,8 +171,7 @@ class Dataset[T] private[sql](
   @transient private[sql] val logicalPlan: LogicalPlan = {
     def hasSideEffects(plan: LogicalPlan): Boolean = plan match {
       case _: Command |
-           _: InsertIntoTable |
-           _: CreateTableUsingAsSelect => true
+           _: InsertIntoTable => true
       case _ => false
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 22b1e07..2bb6862 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, OneRowRelation, ScriptInputOutputSchema}
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.{CreateTempViewUsing, _}
+import org.apache.spark.sql.execution.datasources.{CreateTable, CreateTempViewUsing, _}
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf, VariableSubstitution}
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -310,7 +310,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a [[CreateTableUsing]] or a [[CreateTableUsingAsSelect]] logical plan.
+   * Create a [[CreateTable]] logical plan.
    */
   override def visitCreateTableUsing(ctx: CreateTableUsingContext): LogicalPlan = withOrigin(ctx) {
     val (table, temp, ifNotExists, external) = visitCreateTableHeader(ctx.createTableHeader)
@@ -319,12 +319,31 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     }
     val options = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val provider = ctx.tableProvider.qualifiedName.getText
+    val schema = Option(ctx.colTypeList()).map(createStructType)
     val partitionColumnNames =
       Option(ctx.partitionColumnNames)
         .map(visitIdentifierList(_).toArray)
         .getOrElse(Array.empty[String])
     val bucketSpec = Option(ctx.bucketSpec()).map(visitBucketSpec)
 
+    val tableDesc = CatalogTable(
+      identifier = table,
+      // TODO: actually the table type may be EXTERNAL if we have `path` in options. However, the
+      // physical plan `CreateDataSourceTableCommand` doesn't take table type as parameter, but a
+      // boolean flag called `managedIfNoPath`. We set the table type to MANAGED here to simulate
+      // setting the `managedIfNoPath` flag. In the future we should refactor the physical plan and
+      // make it take `CatalogTable` directly.
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty.copy(properties = options),
+      schema = schema.getOrElse(new StructType),
+      provider = Some(provider),
+      partitionColumnNames = partitionColumnNames,
+      bucketSpec = bucketSpec
+    )
+
+    // Determine the storage mode.
+    val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
     if (ctx.query != null) {
       // Get the backing query.
       val query = plan(ctx.query)
@@ -333,32 +352,19 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
         operationNotAllowed("CREATE TEMPORARY TABLE ... USING ... AS query", ctx)
       }
 
-      // Determine the storage mode.
-      val mode = if (ifNotExists) {
-        SaveMode.Ignore
-      } else {
-        SaveMode.ErrorIfExists
-      }
-
-      CreateTableUsingAsSelect(
-        table, provider, partitionColumnNames, bucketSpec, mode, options, query)
+      CreateTable(tableDesc, mode, Some(query))
     } else {
-      val struct = Option(ctx.colTypeList()).map(createStructType)
-      if (struct.isEmpty && bucketSpec.nonEmpty) {
-        throw new ParseException(
-          "Expected explicit specification of table schema when using CLUSTERED BY clause.", ctx)
-      }
+      if (temp) {
+        if (ifNotExists) {
+          operationNotAllowed("CREATE TEMPORARY TABLE IF NOT EXISTS", ctx)
+        }
 
-      CreateTableUsing(
-        table,
-        struct,
-        provider,
-        temp,
-        options,
-        partitionColumnNames,
-        bucketSpec,
-        ifNotExists,
-        managedIfNoPath = true)
+        logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, please use " +
+          "CREATE TEMPORARY VIEW ... USING ... instead")
+        CreateTempViewUsing(table, schema, replace = true, provider, options)
+      } else {
+        CreateTable(tableDesc, mode, None)
+      }
     }
   }
 
@@ -891,8 +897,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
   }
 
   /**
-   * Create a table, returning either a [[CreateTableCommand]] or a
-   * [[CreateHiveTableAsSelectLogicalPlan]].
+   * Create a table, returning a [[CreateTable]] logical plan.
    *
    * This is not used to create datasource tables, which is handled through
    * "CREATE TABLE ... USING ...".
@@ -933,23 +938,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
     val properties = Option(ctx.tablePropertyList).map(visitPropertyKeyValues).getOrElse(Map.empty)
     val selectQuery = Option(ctx.query).map(plan)
 
-    // Ensuring whether no duplicate name is used in table definition
-    val colNames = dataCols.map(_.name)
-    if (colNames.length != colNames.distinct.length) {
-      val duplicateColumns = colNames.groupBy(identity).collect {
-        case (x, ys) if ys.length > 1 => "\"" + x + "\""
-      }
-      operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
-        duplicateColumns.mkString("[", ",", "]"), ctx)
-    }
-
-    // For Hive tables, partition columns must not be part of the schema
-    val badPartCols = partitionCols.map(_.name).toSet.intersect(colNames.toSet)
-    if (badPartCols.nonEmpty) {
-      operationNotAllowed(s"Partition columns may not be specified in the schema: " +
-        badPartCols.map("\"" + _ + "\"").mkString("[", ",", "]"), ctx)
-    }
-
     // Note: Hive requires partition columns to be distinct from the schema, so we need
     // to include the partition columns here explicitly
     val schema = StructType(dataCols ++ partitionCols)
@@ -1001,10 +989,13 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
       tableType = tableType,
       storage = storage,
       schema = schema,
+      provider = Some("hive"),
       partitionColumnNames = partitionCols.map(_.name),
       properties = properties,
       comment = comment)
 
+    val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
+
     selectQuery match {
       case Some(q) =>
         // Just use whatever is projected in the select statement as our schema
@@ -1025,7 +1016,6 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
         val hasStorageProperties = (ctx.createFileFormat != null) || (ctx.rowFormat != null)
         if (conf.convertCTAS && !hasStorageProperties) {
-          val mode = if (ifNotExists) SaveMode.Ignore else SaveMode.ErrorIfExists
           // At here, both rowStorage.serdeProperties and fileStorage.serdeProperties
           // are empty Maps.
           val optionsWithPath = if (location.isDefined) {
@@ -1033,19 +1023,17 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
           } else {
             Map.empty[String, String]
           }
-          CreateTableUsingAsSelect(
-            tableIdent = tableDesc.identifier,
-            provider = conf.defaultDataSourceName,
-            partitionColumns = tableDesc.partitionColumnNames.toArray,
-            bucketSpec = None,
-            mode = mode,
-            options = optionsWithPath,
-            q
+
+          val newTableDesc = tableDesc.copy(
+            storage = CatalogStorageFormat.empty.copy(properties = optionsWithPath),
+            provider = Some(conf.defaultDataSourceName)
           )
+
+          CreateTable(newTableDesc, mode, Some(q))
         } else {
-          CreateHiveTableAsSelectLogicalPlan(tableDesc, q, ifNotExists)
+          CreateTable(tableDesc, mode, Some(q))
         }
-      case None => CreateTableCommand(tableDesc, ifNotExists)
+      case None => CreateTable(tableDesc, mode, None)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 52e1981..fb08e12 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -19,15 +19,15 @@ package org.apache.spark.sql.execution
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{AnalysisException, Strategy}
+import org.apache.spark.sql.{execution, SaveMode, Strategy}
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{BroadcastHint, LogicalPlan}
 import org.apache.spark.sql.catalyst.plans.physical._
-import org.apache.spark.sql.execution
 import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.datasources._
@@ -420,45 +420,40 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
   object DDLStrategy extends Strategy {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-      case c: CreateTableUsing if c.temporary && !c.allowExisting =>
-        logWarning(
-          s"CREATE TEMPORARY TABLE ${c.tableIdent.identifier} USING... is deprecated, " +
-            s"please use CREATE TEMPORARY VIEW viewName USING... instead")
-        ExecutedCommandExec(
-          CreateTempViewUsing(
-            c.tableIdent, c.userSpecifiedSchema, replace = true, c.provider, c.options)) :: Nil
-
-      case c: CreateTableUsing if !c.temporary =>
+      case CreateTable(tableDesc, mode, None) if tableDesc.provider.get == "hive" =>
+        val cmd = CreateTableCommand(tableDesc, ifNotExists = mode == SaveMode.Ignore)
+        ExecutedCommandExec(cmd) :: Nil
+
+      case CreateTable(tableDesc, mode, None) =>
         val cmd =
           CreateDataSourceTableCommand(
-            c.tableIdent,
-            c.userSpecifiedSchema,
-            c.provider,
-            c.options,
-            c.partitionColumns,
-            c.bucketSpec,
-            c.allowExisting,
-            c.managedIfNoPath)
+            tableDesc.identifier,
+            if (tableDesc.schema.nonEmpty) Some(tableDesc.schema) else None,
+            tableDesc.provider.get,
+            tableDesc.storage.properties,
+            tableDesc.partitionColumnNames.toArray,
+            tableDesc.bucketSpec,
+            ignoreIfExists = mode == SaveMode.Ignore,
+            managedIfNoPath = tableDesc.tableType == CatalogTableType.MANAGED)
         ExecutedCommandExec(cmd) :: Nil
 
-      case c: CreateTableUsing if c.temporary && c.allowExisting =>
-        throw new AnalysisException(
-          "allowExisting should be set to false when creating a temporary table.")
+      // CREATE TABLE ... AS SELECT ... for hive serde table is handled in hive module, by rule
+      // `CreateTables`
 
-      case c: CreateTableUsingAsSelect =>
+      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get != "hive" =>
         val cmd =
           CreateDataSourceTableAsSelectCommand(
-            c.tableIdent,
-            c.provider,
-            c.partitionColumns,
-            c.bucketSpec,
-            c.mode,
-            c.options,
-            c.child)
+            tableDesc.identifier,
+            tableDesc.provider.get,
+            tableDesc.partitionColumnNames.toArray,
+            tableDesc.bucketSpec,
+            mode,
+            tableDesc.storage.properties,
+            query)
         ExecutedCommandExec(cmd) :: Nil
 
-      case c: CreateTempViewUsing =>
-        ExecutedCommandExec(c) :: Nil
+      case c: CreateTempViewUsing => ExecutedCommandExec(c) :: Nil
+
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index 93eb386..7b028e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.command
 
-import java.util.regex.Pattern
-
 import scala.collection.mutable
 import scala.util.control.NonFatal
 
@@ -59,21 +57,6 @@ case class CreateDataSourceTableCommand(
   extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // Since we are saving metadata to metastore, we need to check if metastore supports
-    // the table name and database name we have for this query. MetaStoreUtils.validateName
-    // is the method used by Hive to check if a table name or a database name is valid for
-    // the metastore.
-    if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
-      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
-        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-    }
-    if (tableIdent.database.isDefined &&
-      !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
-      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
-        s"for metastore. Metastore only accepts database name containing " +
-        s"characters, numbers and _.")
-    }
-
     val tableName = tableIdent.unquotedString
     val sessionState = sparkSession.sessionState
 
@@ -106,22 +89,12 @@ case class CreateDataSourceTableCommand(
     val partitionColumns = if (userSpecifiedSchema.nonEmpty) {
       userSpecifiedPartitionColumns
     } else {
-      val res = dataSource match {
+      // This is guaranteed in `PreprocessDDL`.
+      assert(userSpecifiedPartitionColumns.isEmpty)
+      dataSource match {
         case r: HadoopFsRelation => r.partitionSchema.fieldNames
         case _ => Array.empty[String]
       }
-      if (userSpecifiedPartitionColumns.length > 0) {
-        // The table does not have a specified schema, which means that the schema will be inferred
-        // when we load the table. So, we are not expecting partition columns and we will discover
-        // partitions when we load the table. However, if there are specified partition columns,
-        // we simply ignore them and provide a warning message.
-        logWarning(
-          s"Specified partition columns (${userSpecifiedPartitionColumns.mkString(",")}) will be " +
-            s"ignored. The schema and partition columns of table $tableIdent are inferred. " +
-            s"Schema: ${dataSource.schema.simpleString}; " +
-            s"Partition columns: ${res.mkString("(", ", ", ")")}")
-      }
-      res
     }
 
     CreateDataSourceTableUtils.createDataSourceTable(
@@ -164,21 +137,6 @@ case class CreateDataSourceTableAsSelectCommand(
   override protected def innerChildren: Seq[QueryPlan[_]] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    // Since we are saving metadata to metastore, we need to check if metastore supports
-    // the table name and database name we have for this query. MetaStoreUtils.validateName
-    // is the method used by Hive to check if a table name or a database name is valid for
-    // the metastore.
-    if (!CreateDataSourceTableUtils.validateName(tableIdent.table)) {
-      throw new AnalysisException(s"Table name ${tableIdent.table} is not a valid name for " +
-        s"metastore. Metastore only accepts table name containing characters, numbers and _.")
-    }
-    if (tableIdent.database.isDefined &&
-      !CreateDataSourceTableUtils.validateName(tableIdent.database.get)) {
-      throw new AnalysisException(s"Database name ${tableIdent.database.get} is not a valid name " +
-        s"for metastore. Metastore only accepts database name containing " +
-        s"characters, numbers and _.")
-    }
-
     val tableName = tableIdent.unquotedString
     val sessionState = sparkSession.sessionState
     var createMetastoreTable = false
@@ -311,20 +269,6 @@ object CreateDataSourceTableUtils extends Logging {
   val DATASOURCE_SCHEMA_BUCKETCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "bucketCol."
   val DATASOURCE_SCHEMA_SORTCOL_PREFIX = DATASOURCE_SCHEMA_PREFIX + "sortCol."
 
-  /**
-   * Checks if the given name conforms the Hive standard ("[a-zA-z_0-9]+"),
-   * i.e. if this name only contains characters, numbers, and _.
-   *
-   * This method is intended to have the same behavior of
-   * org.apache.hadoop.hive.metastore.MetaStoreUtils.validateName.
-   */
-  def validateName(name: String): Boolean = {
-    val tpat = Pattern.compile("[\\w_]+")
-    val matcher = tpat.matcher(name)
-
-    matcher.matches()
-  }
-
   def createDataSourceTable(
       sparkSession: SparkSession,
       tableIdent: TableIdentifier,
@@ -396,6 +340,7 @@ object CreateDataSourceTableUtils extends Logging {
         identifier = tableIdent,
         tableType = tableType,
         schema = new StructType,
+        provider = Some(provider),
         storage = CatalogStorageFormat(
           locationUri = None,
           inputFormat = None,
@@ -425,6 +370,7 @@ object CreateDataSourceTableUtils extends Logging {
           properties = options
         ),
         schema = relation.schema,
+        provider = Some(provider),
         properties = tableProperties.toMap,
         viewText = None)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
index 18369b5..1b1e212 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ddl.scala
@@ -19,50 +19,25 @@ package org.apache.spark.sql.execution.datasources
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.BucketSpec
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions.Attribute
-import org.apache.spark.sql.catalyst.plans.logical
-import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.plans.logical.{Command, LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.types._
 
+case class CreateTable(tableDesc: CatalogTable, mode: SaveMode, query: Option[LogicalPlan])
+  extends LogicalPlan with Command {
+  assert(tableDesc.provider.isDefined, "The table to be created must have a provider.")
 
-/**
- * Used to represent the operation of create table using a data source.
- *
- * @param allowExisting If it is true, we will do nothing when the table already exists.
- *                      If it is false, an exception will be thrown
- */
-case class CreateTableUsing(
-    tableIdent: TableIdentifier,
-    userSpecifiedSchema: Option[StructType],
-    provider: String,
-    temporary: Boolean,
-    options: Map[String, String],
-    partitionColumns: Array[String],
-    bucketSpec: Option[BucketSpec],
-    allowExisting: Boolean,
-    managedIfNoPath: Boolean) extends LogicalPlan with logical.Command {
-
-  override def output: Seq[Attribute] = Seq.empty
-  override def children: Seq[LogicalPlan] = Seq.empty
-}
+  if (query.isEmpty) {
+    assert(
+      mode == SaveMode.ErrorIfExists || mode == SaveMode.Ignore,
+      "create table without data insertion can only use ErrorIfExists or Ignore as SaveMode.")
+  }
 
-/**
- * A node used to support CTAS statements and saveAsTable for the data source API.
- * This node is a [[logical.UnaryNode]] instead of a [[logical.Command]] because we want the
- * analyzer can analyze the logical plan that will be used to populate the table.
- * So, [[PreWriteCheck]] can detect cases that are not allowed.
- */
-case class CreateTableUsingAsSelect(
-    tableIdent: TableIdentifier,
-    provider: String,
-    partitionColumns: Array[String],
-    bucketSpec: Option[BucketSpec],
-    mode: SaveMode,
-    options: Map[String, String],
-    child: LogicalPlan) extends logical.UnaryNode {
   override def output: Seq[Attribute] = Seq.empty[Attribute]
+
+  override def children: Seq[LogicalPlan] = query.toSeq
 }
 
 case class CreateTempViewUsing(

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
index 15b9d14..d5b9232 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala
@@ -17,17 +17,21 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.util.regex.Pattern
+
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogRelation, CatalogTable, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, Cast, RowOrdering}
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{BaseRelation, InsertableRelation}
+import org.apache.spark.sql.types.{AtomicType, StructType}
 
 /**
  * Try to replaces [[UnresolvedRelation]]s with [[ResolveDataSource]].
@@ -62,6 +66,130 @@ private[sql] class ResolveDataSource(sparkSession: SparkSession) extends Rule[Lo
 }
 
 /**
+ * Preprocess some DDL plans, e.g. [[CreateTable]], to do some normalization and checking.
+ */
+case class PreprocessDDL(conf: SQLConf) extends Rule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    // When we CREATE TABLE without specifying the table schema, we should fail the query if
+    // bucketing information is specified, as we can't infer bucketing from data files currently,
+    // and we should ignore the partition columns if it's specified, as we will infer it later, at
+    // runtime.
+    case c @ CreateTable(tableDesc, _, None) if tableDesc.schema.isEmpty =>
+      if (tableDesc.bucketSpec.isDefined) {
+        failAnalysis("Cannot specify bucketing information if the table schema is not specified " +
+          "when creating and will be inferred at runtime")
+      }
+
+      val partitionColumnNames = tableDesc.partitionColumnNames
+      if (partitionColumnNames.nonEmpty) {
+        // The table does not have a specified schema, which means that the schema will be inferred
+        // at runtime. So, we are not expecting partition columns and we will discover partitions
+        // at runtime. However, if there are specified partition columns, we simply ignore them and
+        // provide a warning message.
+        logWarning(
+          s"Specified partition columns (${partitionColumnNames.mkString(",")}) will be " +
+            s"ignored. The schema and partition columns of table ${tableDesc.identifier} will " +
+            "be inferred.")
+        c.copy(tableDesc = tableDesc.copy(partitionColumnNames = Nil))
+      } else {
+        c
+      }
+
+    // Here we normalize partition, bucket and sort column names, w.r.t. the case sensitivity
+    // config, and do various checks:
+    //   * column names in table definition can't be duplicated.
+    //   * partition, bucket and sort column names must exist in table definition.
+    //   * partition, bucket and sort column names can't be duplicated.
+    //   * can't use all table columns as partition columns.
+    //   * partition columns' type must be AtomicType.
+    //   * sort columns' type must be orderable.
+    case c @ CreateTable(tableDesc, mode, query) if c.childrenResolved =>
+      val schema = if (query.isDefined) query.get.schema else tableDesc.schema
+      checkDuplication(schema.map(_.name), "table definition of " + tableDesc.identifier)
+
+      val partitionColsChecked = checkPartitionColumns(schema, tableDesc)
+      val bucketColsChecked = checkBucketColumns(schema, partitionColsChecked)
+      c.copy(tableDesc = bucketColsChecked)
+  }
+
+  private def checkPartitionColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+    val normalizedPartitionCols = tableDesc.partitionColumnNames.map { colName =>
+      normalizeColumnName(tableDesc.identifier, schema, colName, "partition")
+    }
+    checkDuplication(normalizedPartitionCols, "partition")
+
+    if (schema.nonEmpty && normalizedPartitionCols.length == schema.length) {
+      if (tableDesc.provider.get == "hive") {
+        // When we hit this branch, it means users didn't specify schema for the table to be
+        // created, as we always include partition columns in table schema for hive serde tables.
+        // The real schema will be inferred at hive metastore by hive serde, plus the given
+        // partition columns, so we should not fail the analysis here.
+      } else {
+        failAnalysis("Cannot use all columns for partition columns")
+      }
+
+    }
+
+    schema.filter(f => normalizedPartitionCols.contains(f.name)).map(_.dataType).foreach {
+      case _: AtomicType => // OK
+      case other => failAnalysis(s"Cannot use ${other.simpleString} for partition column")
+    }
+
+    tableDesc.copy(partitionColumnNames = normalizedPartitionCols)
+  }
+
+  private def checkBucketColumns(schema: StructType, tableDesc: CatalogTable): CatalogTable = {
+    tableDesc.bucketSpec match {
+      case Some(BucketSpec(numBuckets, bucketColumnNames, sortColumnNames)) =>
+        val normalizedBucketCols = bucketColumnNames.map { colName =>
+          normalizeColumnName(tableDesc.identifier, schema, colName, "bucket")
+        }
+        checkDuplication(normalizedBucketCols, "bucket")
+
+        val normalizedSortCols = sortColumnNames.map { colName =>
+          normalizeColumnName(tableDesc.identifier, schema, colName, "sort")
+        }
+        checkDuplication(normalizedSortCols, "sort")
+
+        schema.filter(f => normalizedSortCols.contains(f.name)).map(_.dataType).foreach {
+          case dt if RowOrdering.isOrderable(dt) => // OK
+          case other => failAnalysis(s"Cannot use ${other.simpleString} for sorting column")
+        }
+
+        tableDesc.copy(
+          bucketSpec = Some(BucketSpec(numBuckets, normalizedBucketCols, normalizedSortCols))
+        )
+
+      case None => tableDesc
+    }
+  }
+
+  private def checkDuplication(colNames: Seq[String], colType: String): Unit = {
+    if (colNames.distinct.length != colNames.length) {
+      val duplicateColumns = colNames.groupBy(identity).collect {
+        case (x, ys) if ys.length > 1 => x
+      }
+      failAnalysis(s"Found duplicate column(s) in $colType: ${duplicateColumns.mkString(", ")}")
+    }
+  }
+
+  private def normalizeColumnName(
+      tableIdent: TableIdentifier,
+      schema: StructType,
+      colName: String,
+      colType: String): String = {
+    val tableCols = schema.map(_.name)
+    tableCols.find(conf.resolver(_, colName)).getOrElse {
+      failAnalysis(s"$colType column $colName is not defined in table $tableIdent, " +
+        s"defined table columns are: ${tableCols.mkString(", ")}")
+    }
+  }
+
+  private def failAnalysis(msg: String) = throw new AnalysisException(msg)
+}
+
+/**
  * Preprocess the [[InsertIntoTable]] plan. Throws exception if the number of columns mismatch, or
  * specified partition columns are different from the existing partition columns in the target
  * table. It also does data type casting and field renaming, to make sure that the columns to be
@@ -152,8 +280,25 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
 
   def failAnalysis(msg: String): Unit = { throw new AnalysisException(msg) }
 
+  // This regex is used to check if the table name and database name is valid for `CreateTable`.
+  private val validNameFormat = Pattern.compile("[\\w_]+")
+
   def apply(plan: LogicalPlan): Unit = {
     plan.foreach {
+      case c @ CreateTable(tableDesc, mode, query) if c.resolved =>
+        // Since we are saving table metadata to metastore, we should make sure the table name
+        // and database name don't break some common restrictions, e.g. special chars except
+        // underscore are not allowed.
+        val tblIdent = tableDesc.identifier
+        if (!validNameFormat.matcher(tblIdent.table).matches()) {
+          failAnalysis(s"Table name ${tblIdent.table} is not a valid name for " +
+            s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+        }
+        if (tblIdent.database.exists(db => !validNameFormat.matcher(db).matches())) {
+          failAnalysis(s"Database name ${tblIdent.database.get} is not a valid name for " +
+            s"metastore. Metastore only accepts table name containing characters, numbers and _.")
+        }
+
       case i @ logical.InsertIntoTable(
         l @ LogicalRelation(t: InsertableRelation, _, _),
         partition, query, overwrite, ifNotExists) =>
@@ -206,22 +351,22 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
         // The relation in l is not an InsertableRelation.
         failAnalysis(s"$l does not allow insertion.")
 
-      case c: CreateTableUsingAsSelect =>
+      case CreateTable(tableDesc, mode, Some(query)) =>
         // When the SaveMode is Overwrite, we need to check if the table is an input table of
         // the query. If so, we will throw an AnalysisException to let users know it is not allowed.
-        if (c.mode == SaveMode.Overwrite && catalog.tableExists(c.tableIdent)) {
+        if (mode == SaveMode.Overwrite && catalog.tableExists(tableDesc.identifier)) {
           // Need to remove SubQuery operator.
-          EliminateSubqueryAliases(catalog.lookupRelation(c.tableIdent)) match {
+          EliminateSubqueryAliases(catalog.lookupRelation(tableDesc.identifier)) match {
             // Only do the check if the table is a data source table
             // (the relation is a BaseRelation).
             case l @ LogicalRelation(dest: BaseRelation, _, _) =>
               // Get all input data source relations of the query.
-              val srcRelations = c.child.collect {
+              val srcRelations = query.collect {
                 case LogicalRelation(src: BaseRelation, _, _) => src
               }
               if (srcRelations.contains(dest)) {
                 failAnalysis(
-                  s"Cannot overwrite table ${c.tableIdent} that is also being read from.")
+                  s"Cannot overwrite table ${tableDesc.identifier} that is also being read from.")
               } else {
                 // OK
               }
@@ -232,19 +377,6 @@ private[sql] case class PreWriteCheck(conf: SQLConf, catalog: SessionCatalog)
           // OK
         }
 
-        PartitioningUtils.validatePartitionColumn(
-          c.child.schema, c.partitionColumns, conf.caseSensitiveAnalysis)
-
-        for {
-          spec <- c.bucketSpec
-          sortColumnName <- spec.sortColumnNames
-          sortColumn <- c.child.schema.find(_.name == sortColumnName)
-        } {
-          if (!RowOrdering.isOrderable(sortColumn.dataType)) {
-            failAnalysis(s"Cannot use ${sortColumn.dataType.simpleString} for sorting column.")
-          }
-        }
-
       case _ => // OK
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index f8f7872..1f87f0e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -21,13 +21,13 @@ import scala.collection.JavaConverters._
 import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql._
 import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table}
 import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.SessionCatalog
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, SessionCatalog}
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
-import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.types.StructType
 
 
@@ -223,20 +223,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       tableName: String,
       source: String,
       options: Map[String, String]): DataFrame = {
-    val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = None,
-        source,
-        temporary = false,
-        options = options,
-        partitionColumns = Array.empty[String],
-        bucketSpec = None,
-        allowExisting = false,
-        managedIfNoPath = false)
-    sparkSession.sessionState.executePlan(cmd).toRdd
-    sparkSession.table(tableIdent)
+    createExternalTable(tableName, source, new StructType, options)
   }
 
   /**
@@ -271,19 +258,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
       source: String,
       schema: StructType,
       options: Map[String, String]): DataFrame = {
+    if (source == "hive") {
+      throw new AnalysisException("Cannot create hive serde table with createExternalTable API.")
+    }
+
     val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
-    val cmd =
-      CreateTableUsing(
-        tableIdent,
-        userSpecifiedSchema = Some(schema),
-        source,
-        temporary = false,
-        options,
-        partitionColumns = Array.empty[String],
-        bucketSpec = None,
-        allowExisting = false,
-        managedIfNoPath = false)
-    sparkSession.sessionState.executePlan(cmd).toRdd
+    val tableDesc = CatalogTable(
+      identifier = tableIdent,
+      tableType = CatalogTableType.EXTERNAL,
+      storage = CatalogStorageFormat.empty.copy(properties = options),
+      schema = schema,
+      provider = Some(source)
+    )
+    val plan = CreateTable(tableDesc, SaveMode.ErrorIfExists, None)
+    sparkSession.sessionState.executePlan(plan).toRdd
     sparkSession.table(tableIdent)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index a228566..052bce0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -30,7 +30,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.execution.command.AnalyzeTableCommand
-import org.apache.spark.sql.execution.datasources.{DataSourceAnalysis, FindDataSourceTable, PreprocessTableInsertion, ResolveDataSource}
+import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryManager}
 import org.apache.spark.sql.util.ExecutionListenerManager
 
@@ -111,6 +111,7 @@ private[sql] class SessionState(sparkSession: SparkSession) {
   lazy val analyzer: Analyzer = {
     new Analyzer(catalog, conf) {
       override val extendedResolutionRules =
+        PreprocessDDL(conf) ::
         PreprocessTableInsertion(conf) ::
         new FindDataSourceTable(sparkSession) ::
         DataSourceAnalysis(conf) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
index 999afc9..044fa5f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLCommandSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.sql.execution.command
 import scala.reflect.{classTag, ClassTag}
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTableType, FunctionResource}
-import org.apache.spark.sql.catalyst.catalog.FunctionResourceType
+import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.Project
 import org.apache.spark.sql.execution.SparkSqlParser
-import org.apache.spark.sql.execution.datasources.CreateTableUsing
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
 import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
 
@@ -243,12 +242,12 @@ class DDLCommandSuite extends PlanTest {
 
     allSources.foreach { s =>
       val query = s"CREATE TABLE my_tab STORED AS $s"
-      val ct = parseAs[CreateTableCommand](query)
+      val ct = parseAs[CreateTable](query)
       val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
       assert(hiveSerde.isDefined)
-      assert(ct.table.storage.serde == hiveSerde.get.serde)
-      assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
-      assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+      assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+      assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+      assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
     }
   }
 
@@ -259,14 +258,14 @@ class DDLCommandSuite extends PlanTest {
     val query2 = s"$createTableStart DELIMITED FIELDS TERMINATED BY ' ' $fileFormat"
 
     // No conflicting serdes here, OK
-    val parsed1 = parseAs[CreateTableCommand](query1)
-    assert(parsed1.table.storage.serde == Some("anything"))
-    assert(parsed1.table.storage.inputFormat == Some("inputfmt"))
-    assert(parsed1.table.storage.outputFormat == Some("outputfmt"))
-    val parsed2 = parseAs[CreateTableCommand](query2)
-    assert(parsed2.table.storage.serde.isEmpty)
-    assert(parsed2.table.storage.inputFormat == Some("inputfmt"))
-    assert(parsed2.table.storage.outputFormat == Some("outputfmt"))
+    val parsed1 = parseAs[CreateTable](query1)
+    assert(parsed1.tableDesc.storage.serde == Some("anything"))
+    assert(parsed1.tableDesc.storage.inputFormat == Some("inputfmt"))
+    assert(parsed1.tableDesc.storage.outputFormat == Some("outputfmt"))
+    val parsed2 = parseAs[CreateTable](query2)
+    assert(parsed2.tableDesc.storage.serde.isEmpty)
+    assert(parsed2.tableDesc.storage.inputFormat == Some("inputfmt"))
+    assert(parsed2.tableDesc.storage.outputFormat == Some("outputfmt"))
   }
 
   test("create table - row format serde and generic file format") {
@@ -276,12 +275,12 @@ class DDLCommandSuite extends PlanTest {
     allSources.foreach { s =>
       val query = s"CREATE TABLE my_tab ROW FORMAT SERDE 'anything' STORED AS $s"
       if (supportedSources.contains(s)) {
-        val ct = parseAs[CreateTableCommand](query)
+        val ct = parseAs[CreateTable](query)
         val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
         assert(hiveSerde.isDefined)
-        assert(ct.table.storage.serde == Some("anything"))
-        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
-        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+        assert(ct.tableDesc.storage.serde == Some("anything"))
+        assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
       } else {
         assertUnsupported(query, Seq("row format serde", "incompatible", s))
       }
@@ -295,12 +294,12 @@ class DDLCommandSuite extends PlanTest {
     allSources.foreach { s =>
       val query = s"CREATE TABLE my_tab ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS $s"
       if (supportedSources.contains(s)) {
-        val ct = parseAs[CreateTableCommand](query)
+        val ct = parseAs[CreateTable](query)
         val hiveSerde = HiveSerDe.sourceToSerDe(s, new SQLConf)
         assert(hiveSerde.isDefined)
-        assert(ct.table.storage.serde == hiveSerde.get.serde)
-        assert(ct.table.storage.inputFormat == hiveSerde.get.inputFormat)
-        assert(ct.table.storage.outputFormat == hiveSerde.get.outputFormat)
+        assert(ct.tableDesc.storage.serde == hiveSerde.get.serde)
+        assert(ct.tableDesc.storage.inputFormat == hiveSerde.get.inputFormat)
+        assert(ct.tableDesc.storage.outputFormat == hiveSerde.get.outputFormat)
       } else {
         assertUnsupported(query, Seq("row format delimited", "only compatible with 'textfile'", s))
       }
@@ -312,9 +311,9 @@ class DDLCommandSuite extends PlanTest {
       sql = "CREATE EXTERNAL TABLE my_tab",
       containsThesePhrases = Seq("create external table", "location"))
     val query = "CREATE EXTERNAL TABLE my_tab LOCATION '/something/anything'"
-    val ct = parseAs[CreateTableCommand](query)
-    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-    assert(ct.table.storage.locationUri == Some("/something/anything"))
+    val ct = parseAs[CreateTable](query)
+    assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table - property values must be set") {
@@ -329,47 +328,29 @@ class DDLCommandSuite extends PlanTest {
 
   test("create table - location implies external") {
     val query = "CREATE TABLE my_tab LOCATION '/something/anything'"
-    val ct = parseAs[CreateTableCommand](query)
-    assert(ct.table.tableType == CatalogTableType.EXTERNAL)
-    assert(ct.table.storage.locationUri == Some("/something/anything"))
-  }
-
-  test("create table - column repeated in partitioning columns") {
-    val query = "CREATE TABLE tab1 (key INT, value STRING) PARTITIONED BY (key INT, hr STRING)"
-    val e = intercept[ParseException] { parser.parsePlan(query) }
-    assert(e.getMessage.contains(
-      "Operation not allowed: Partition columns may not be specified in the schema: [\"key\"]"))
-  }
-
-  test("create table - duplicate column names in the table definition") {
-    val query = "CREATE TABLE default.tab1 (key INT, key STRING)"
-    val e = intercept[ParseException] { parser.parsePlan(query) }
-    assert(e.getMessage.contains("Operation not allowed: Duplicated column names found in " +
-      "table definition of `default`.`tab1`: [\"key\"]"))
+    val ct = parseAs[CreateTable](query)
+    assert(ct.tableDesc.tableType == CatalogTableType.EXTERNAL)
+    assert(ct.tableDesc.storage.locationUri == Some("/something/anything"))
   }
 
   test("create table using - with partitioned by") {
     val query = "CREATE TABLE my_tab(a INT comment 'test', b STRING) " +
       "USING parquet PARTITIONED BY (a)"
-    val expected = CreateTableUsing(
-      TableIdentifier("my_tab"),
-      Some(new StructType()
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType()
         .add("a", IntegerType, nullable = true, "test")
-        .add("b", StringType)),
-      "parquet",
-      false,
-      Map.empty,
-      null,
-      None,
-      false,
-      true)
+        .add("b", StringType),
+      provider = Some("parquet"),
+      partitionColumnNames = Seq("a")
+    )
 
     parser.parsePlan(query) match {
-      case ct: CreateTableUsing =>
-        // We can't compare array in `CreateTableUsing` directly, so here we compare
-        // `partitionColumns` ahead, and make `partitionColumns` null before plan comparison.
-        assert(Seq("a") == ct.partitionColumns.toSeq)
-        comparePlans(ct.copy(partitionColumns = null), expected)
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
       case other =>
         fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
@@ -379,23 +360,19 @@ class DDLCommandSuite extends PlanTest {
   test("create table using - with bucket") {
     val query = "CREATE TABLE my_tab(a INT, b STRING) USING parquet " +
       "CLUSTERED BY (a) SORTED BY (b) INTO 5 BUCKETS"
-    val expected = CreateTableUsing(
-      TableIdentifier("my_tab"),
-      Some(new StructType().add("a", IntegerType).add("b", StringType)),
-      "parquet",
-      false,
-      Map.empty,
-      null,
-      Some(BucketSpec(5, Seq("a"), Seq("b"))),
-      false,
-      true)
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("my_tab"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty,
+      schema = new StructType().add("a", IntegerType).add("b", StringType),
+      provider = Some("parquet"),
+      bucketSpec = Some(BucketSpec(5, Seq("a"), Seq("b")))
+    )
 
     parser.parsePlan(query) match {
-      case ct: CreateTableUsing =>
-        // `Array.empty == Array.empty` returns false, here we set `partitionColumns` to null before
-        // plan comparison.
-        assert(ct.partitionColumns.isEmpty)
-        comparePlans(ct.copy(partitionColumns = null), expected)
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
       case other =>
         fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $query")
@@ -907,22 +884,20 @@ class DDLCommandSuite extends PlanTest {
         |CREATE TABLE table_name USING json
         |OPTIONS (a 1, b 0.1, c TRUE)
       """.stripMargin
-    val expected = CreateTableUsing(
-      TableIdentifier("table_name"),
-      None,
-      "json",
-      false,
-      Map("a" -> "1", "b" -> "0.1", "c" -> "true"),
-      null,
-      None,
-      false,
-      true)
+
+    val expectedTableDesc = CatalogTable(
+      identifier = TableIdentifier("table_name"),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat.empty.copy(
+        properties = Map("a" -> "1", "b" -> "0.1", "c" -> "true")
+      ),
+      schema = new StructType,
+      provider = Some("json")
+    )
 
     parser.parsePlan(sql) match {
-      case ct: CreateTableUsing =>
-        // We can't compare array in `CreateTableUsing` directly, so here we explicitly
-        // set partitionColumns to `null` and then compare it.
-        comparePlans(ct.copy(partitionColumns = null), expected)
+      case CreateTable(tableDesc, _, None) =>
+        assert(tableDesc == expectedTableDesc.copy(createTime = tableDesc.createTime))
       case other =>
         fail(s"Expected to parse ${classOf[CreateTableCommand].getClass.getName} from query," +
           s"got ${other.getClass.getName}: $sql")

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 564fc73..ca9b210 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,7 +30,6 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogDatabase, Catal
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
-import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -94,6 +93,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
         .add("col2", "string")
         .add("a", "int")
         .add("b", "int"),
+      provider = Some("parquet"),
       partitionColumnNames = Seq("a", "b"),
       createTime = 0L)
   }
@@ -359,6 +359,43 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
     }
   }
 
+  test("create table - duplicate column names in the table definition") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int, a string) USING json")
+    }
+    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+  }
+
+  test("create table - partition column names not in table definition") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int, b string) USING json PARTITIONED BY (c)")
+    }
+    assert(e.message == "partition column c is not defined in table `tbl`, " +
+      "defined table columns are: a, b")
+  }
+
+  test("create table - bucket column names not in table definition") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int, b string) USING json CLUSTERED BY (c) INTO 4 BUCKETS")
+    }
+    assert(e.message == "bucket column c is not defined in table `tbl`, " +
+      "defined table columns are: a, b")
+  }
+
+  test("create table - column repeated in partition columns") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int) USING json PARTITIONED BY (a, a)")
+    }
+    assert(e.message == "Found duplicate column(s) in partition: a")
+  }
+
+  test("create table - column repeated in bucket columns") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int) USING json CLUSTERED BY (a, a) INTO 4 BUCKETS")
+    }
+    assert(e.message == "Found duplicate column(s) in bucket: a")
+  }
+
   test("Describe Table with Corrupted Schema") {
     import testImplicits._
 
@@ -1469,7 +1506,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
       withTable("jsonTable") {
         (("a", "b") :: Nil).toDF().write.json(tempDir.getCanonicalPath)
 
-        val e = intercept[ParseException] {
+        val e = intercept[AnalysisException] {
         sql(
           s"""
              |CREATE TABLE jsonTable
@@ -1479,9 +1516,9 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
              |)
              |CLUSTERED BY (inexistentColumnA) SORTED BY (inexistentColumnB) INTO 2 BUCKETS
            """.stripMargin)
-        }.getMessage
-        assert(e.contains(
-          "Expected explicit specification of table schema when using CLUSTERED BY clause"))
+        }
+        assert(e.message == "Cannot specify bucketing information if the table schema is not " +
+          "specified when creating and will be inferred at runtime")
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index db97078..c7c1acd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -23,15 +23,13 @@ import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, SparkSession}
+import org.apache.spark.sql.{AnalysisException, SaveMode, SparkSession}
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
 import org.apache.spark.sql.execution.command.CreateDataSourceTableUtils._
-import org.apache.spark.sql.execution.command.CreateHiveTableAsSelectLogicalPlan
 import org.apache.spark.sql.execution.datasources.{Partition => _, _}
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -436,23 +434,30 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
-      case p: LogicalPlan if p.resolved => p
 
-      case p @ CreateHiveTableAsSelectLogicalPlan(table, child, allowExisting) =>
-        val desc = if (table.storage.serde.isEmpty) {
+      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
           // add default serde
-          table.withNewStorage(
+          tableDesc.withNewStorage(
             serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
         } else {
-          table
+          tableDesc
         }
 
-        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table)
+        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
+
+        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+        // tables yet.
+        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+          throw new AnalysisException("" +
+            "CTAS for hive serde tables does not support append or overwrite semantics.")
+        }
 
         execution.CreateHiveTableAsSelectCommand(
-          desc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          child,
-          allowExisting)
+          newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
+          query,
+          mode == SaveMode.Ignore)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 8773993..e01c053 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -65,6 +65,7 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
         catalog.CreateTables ::
+        PreprocessDDL(conf) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::
         (if (conf.runSQLonFile) new ResolveDataSource(sparkSession) :: Nil else Nil)

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
index e0c07db..69a6884 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveDDLCommandSuite.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive
 
-import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.{AnalysisException, SaveMode}
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
 import org.apache.spark.sql.catalyst.dsl.expressions._
@@ -28,6 +28,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Generate, ScriptTransformation}
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.types.StructType
 
@@ -36,8 +37,7 @@ class HiveDDLCommandSuite extends PlanTest {
 
   private def extractTableDesc(sql: String): (CatalogTable, Boolean) = {
     parser.parsePlan(sql).collect {
-      case c: CreateTableCommand => (c.table, c.ifNotExists)
-      case c: CreateHiveTableAsSelectLogicalPlan => (c.tableDesc, c.allowExisting)
+      case CreateTable(tableDesc, mode, _) => (tableDesc, mode == SaveMode.Ignore)
     }.head
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5effc016/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index d15e11a..e078b58 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -141,6 +141,13 @@ class HiveDDLSuite
     }
   }
 
+  test("create table: partition column names exist in table definition") {
+    val e = intercept[AnalysisException] {
+      sql("CREATE TABLE tbl(a int) PARTITIONED BY (a string)")
+    }
+    assert(e.message == "Found duplicate column(s) in table definition of `tbl`: a")
+  }
+
   test("add/drop partitions - external table") {
     val catalog = spark.sessionState.catalog
     withTempDir { tmpDir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org