You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/10/28 22:36:11 UTC

git commit: [SPARK-3343] [SQL] Add serde support for CTAS

Repository: spark
Updated Branches:
  refs/heads/master abcafcfba -> 4b55482ab


[SPARK-3343] [SQL] Add serde support for CTAS

Currently, `CTAS` (Create Table As Select) doesn't support specifying the `SerDe` in HQL. This PR will pass down the `ASTNode` into the physical operator `execution.CreateTableAsSelect`, which will extract the `CreateTableDesc` object via Hive `SemanticAnalyzer`. In the meantime, I also update the `HiveMetastoreCatalog.createTable` to optionally support the `CreateTableDesc` for table creation.

Author: Cheng Hao <ha...@intel.com>

Closes #2570 from chenghao-intel/ctas_serde and squashes the following commits:

e011ef5 [Cheng Hao] shim for both 0.12 & 0.13.1
cfb3662 [Cheng Hao] revert to hive 0.12
c8a547d [Cheng Hao] Support SerDe properties within CTAS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4b55482a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4b55482a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4b55482a

Branch: refs/heads/master
Commit: 4b55482abf899c27da3d55401ad26b4e9247b327
Parents: abcafcf
Author: Cheng Hao <ha...@intel.com>
Authored: Tue Oct 28 14:36:06 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Tue Oct 28 14:36:06 2014 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala |   8 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |   4 +-
 .../scala/org/apache/spark/sql/QueryTest.scala  |  19 ++
 .../hive/execution/HiveCompatibilitySuite.scala |   6 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 197 ++++++++++++++++---
 .../org/apache/spark/sql/hive/HiveQl.scala      |  15 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  |  17 +-
 .../hive/execution/CreateTableAsSelect.scala    |  39 ++--
 .../scala/org/apache/spark/sql/QueryTest.scala  |  19 ++
 .../sql/hive/execution/HiveExplainSuite.scala   |  37 ++--
 .../sql/hive/execution/SQLQuerySuite.scala      |  59 ++++++
 .../org/apache/spark/sql/hive/Shim12.scala      |   5 +-
 .../org/apache/spark/sql/hive/Shim13.scala      |   6 +-
 13 files changed, 337 insertions(+), 94 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 14b03c7..00bdf10 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -114,11 +114,13 @@ case class InsertIntoTable(
   }
 }
 
-case class CreateTableAsSelect(
+case class CreateTableAsSelect[T](
     databaseName: Option[String],
     tableName: String,
-    child: LogicalPlan) extends UnaryNode {
-  override def output = child.output
+    child: LogicalPlan,
+    allowExisting: Boolean,
+    desc: Option[T] = None) extends UnaryNode {
+  override def output = Seq.empty[Attribute]
   override lazy val resolved = (databaseName != None && childrenResolved)
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 25ba7d8..15516af 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
   @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
     // For various commands (like DDL) and queries with side effects, we force query optimization to
     // happen right away to let these side effects take place eagerly.
-    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
+    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect[_] |_: WriteToFile =>
       LogicalRDD(queryExecution.analyzed.output, queryExecution.toRdd)(sqlContext)
     case _ =>
       baseLogicalPlan
@@ -123,7 +123,7 @@ private[sql] trait SchemaRDDLike {
    */
   @Experimental
   def saveAsTable(tableName: String): Unit =
-    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
+    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan, false)).toRdd
 
   /** Returns the schema as a string in the tree format.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 1fd8d27..042f61f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -22,6 +22,25 @@ import org.apache.spark.sql.catalyst.util._
 
 class QueryTest extends PlanTest {
   /**
+   * Runs the plan and makes sure the answer contains all of the keywords, or the
+   * none of keywords are listed in the answer
+   * @param rdd the [[SchemaRDD]] to be executed
+   * @param exists true for make sure the keywords are listed in the output, otherwise
+   *               to make sure none of the keyword are not listed in the output
+   * @param keywords keyword in string array
+   */
+  def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) {
+    val outputs = rdd.collect().map(_.mkString).mkString
+    for (key <- keywords) {
+      if (exists) {
+        assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)")
+      } else {
+        assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)")
+      }
+    }
+  }
+
+  /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param rdd the [[SchemaRDD]] to be executed
    * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
index 4fc26d6..26d9ca0 100644
--- a/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
+++ b/sql/hive/compatibility/src/test/scala/org/apache/spark/sql/hive/execution/HiveCompatibilitySuite.scala
@@ -229,7 +229,11 @@ class HiveCompatibilitySuite extends HiveQueryFileTest with BeforeAndAfter {
 
     // Needs constant object inspectors
     "udf_round",
-    "udf7"
+    "udf7",
+
+    // Sort with Limit clause causes failure.
+    "ctas",
+    "ctas_hadoop20"
   ) ++ HiveShim.compatibilityBlackList
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 39d87a9..2dd2c88 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,27 @@
 
 package org.apache.spark.sql.hive
 
+import java.io.IOException
+import java.util.{List => JList}
+
 import scala.util.parsing.combinator.RegexParsers
 
-import org.apache.hadoop.hive.metastore.api.{FieldSchema, SerDeInfo, StorageDescriptor, Partition => TPartition, Table => TTable}
-import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.TableDesc
-import org.apache.hadoop.hive.serde2.Deserializer
+import org.apache.hadoop.util.ReflectionUtils
+import org.apache.hadoop.fs.Path
+
+import org.apache.hadoop.hive.metastore.TableType
+import org.apache.hadoop.hive.metastore.api.FieldSchema
+import org.apache.hadoop.hive.metastore.api.{Table => TTable, Partition => TPartition}
+import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table, HiveException}
+import org.apache.hadoop.hive.ql.plan.{TableDesc, CreateTableDesc}
+import org.apache.hadoop.hive.serde.serdeConstants
+import org.apache.hadoop.hive.serde2.{Deserializer, SerDeException}
+import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
 
 import org.apache.spark.Logging
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.SQLContext
-import org.apache.spark.sql.catalyst.analysis.Catalog
+import org.apache.spark.sql.catalyst.analysis.{Catalog, OverrideCatalog}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -66,37 +76,164 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       table.getTTable, partitions.map(part => part.getTPartition))(hive)
   }
 
+  /**
+   * Create table with specified database, table name, table description and schema
+   * @param databaseName Database Name
+   * @param tableName Table Name
+   * @param schema Schema of the new table, if not specified, will use the schema
+   *               specified in crtTbl
+   * @param allowExisting if true, ignore AlreadyExistsException
+   * @param desc CreateTableDesc object which contains the SerDe info. Currently
+   *               we support most of the features except the bucket.
+   */
   def createTable(
       databaseName: String,
       tableName: String,
       schema: Seq[Attribute],
-      allowExisting: Boolean = false): Unit = {
+      allowExisting: Boolean = false,
+      desc: Option[CreateTableDesc] = None) {
+    val hconf = hive.hiveconf
+
     val (dbName, tblName) = processDatabaseAndTableName(databaseName, tableName)
-    val table = new Table(dbName, tblName)
-    val hiveSchema =
+    val tbl = new Table(dbName, tblName)
+
+    val crtTbl: CreateTableDesc = desc.getOrElse(null)
+
+    // We should respect the passed in schema, unless it's not set
+    val hiveSchema: JList[FieldSchema] = if (schema == null || schema.isEmpty) {
+      crtTbl.getCols
+    } else {
       schema.map(attr => new FieldSchema(attr.name, toMetastoreType(attr.dataType), ""))
-    table.setFields(hiveSchema)
-
-    val sd = new StorageDescriptor()
-    table.getTTable.setSd(sd)
-    sd.setCols(hiveSchema)
-
-    // TODO: THESE ARE ALL DEFAULTS, WE NEED TO PARSE / UNDERSTAND the output specs.
-    sd.setCompressed(false)
-    sd.setParameters(Map[String, String]())
-    sd.setInputFormat("org.apache.hadoop.mapred.TextInputFormat")
-    sd.setOutputFormat("org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")
-    val serDeInfo = new SerDeInfo()
-    serDeInfo.setName(tblName)
-    serDeInfo.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
-    serDeInfo.setParameters(Map[String, String]())
-    sd.setSerdeInfo(serDeInfo)
+    }
+    tbl.setFields(hiveSchema)
+
+    // Most of code are similar with the DDLTask.createTable() of Hive,
+    if (crtTbl != null && crtTbl.getTblProps() != null) {
+      tbl.getTTable().getParameters().putAll(crtTbl.getTblProps())
+    }
+
+    if (crtTbl != null && crtTbl.getPartCols() != null) {
+      tbl.setPartCols(crtTbl.getPartCols())
+    }
+
+    if (crtTbl != null && crtTbl.getStorageHandler() != null) {
+      tbl.setProperty(
+        org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE,
+        crtTbl.getStorageHandler())
+    }
+
+    /*
+     * We use LazySimpleSerDe by default.
+     *
+     * If the user didn't specify a SerDe, and any of the columns are not simple
+     * types, we will have to use DynamicSerDe instead.
+     */
+    if (crtTbl == null || crtTbl.getSerName() == null) {
+      val storageHandler = tbl.getStorageHandler()
+      if (storageHandler == null) {
+        logInfo(s"Default to LazySimpleSerDe for table $dbName.$tblName")
+        tbl.setSerializationLib(classOf[LazySimpleSerDe].getName())
+
+        import org.apache.hadoop.mapred.TextInputFormat
+        import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
+        import org.apache.hadoop.io.Text
+
+        tbl.setInputFormatClass(classOf[TextInputFormat])
+        tbl.setOutputFormatClass(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]])
+        tbl.setSerializationLib("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
+      } else {
+        val serDeClassName = storageHandler.getSerDeClass().getName()
+        logInfo(s"Use StorageHandler-supplied $serDeClassName for table $dbName.$tblName")
+        tbl.setSerializationLib(serDeClassName)
+      }
+    } else {
+      // let's validate that the serde exists
+      val serdeName = crtTbl.getSerName()
+      try {
+        val d = ReflectionUtils.newInstance(hconf.getClassByName(serdeName), hconf)
+        if (d != null) {
+          logDebug("Found class for $serdeName")
+        }
+      } catch {
+        case e: SerDeException => throw new HiveException("Cannot validate serde: " + serdeName, e)
+      }
+      tbl.setSerializationLib(serdeName)
+    }
+
+    if (crtTbl != null && crtTbl.getFieldDelim() != null) {
+      tbl.setSerdeParam(serdeConstants.FIELD_DELIM, crtTbl.getFieldDelim())
+      tbl.setSerdeParam(serdeConstants.SERIALIZATION_FORMAT, crtTbl.getFieldDelim())
+    }
+    if (crtTbl != null && crtTbl.getFieldEscape() != null) {
+      tbl.setSerdeParam(serdeConstants.ESCAPE_CHAR, crtTbl.getFieldEscape())
+    }
+
+    if (crtTbl != null && crtTbl.getCollItemDelim() != null) {
+      tbl.setSerdeParam(serdeConstants.COLLECTION_DELIM, crtTbl.getCollItemDelim())
+    }
+    if (crtTbl != null && crtTbl.getMapKeyDelim() != null) {
+      tbl.setSerdeParam(serdeConstants.MAPKEY_DELIM, crtTbl.getMapKeyDelim())
+    }
+    if (crtTbl != null && crtTbl.getLineDelim() != null) {
+      tbl.setSerdeParam(serdeConstants.LINE_DELIM, crtTbl.getLineDelim())
+    }
+
+    if (crtTbl != null && crtTbl.getSerdeProps() != null) {
+      val iter = crtTbl.getSerdeProps().entrySet().iterator()
+      while (iter.hasNext()) {
+        val m = iter.next()
+        tbl.setSerdeParam(m.getKey(), m.getValue())
+      }
+    }
+
+    if (crtTbl != null && crtTbl.getComment() != null) {
+      tbl.setProperty("comment", crtTbl.getComment())
+    }
+
+    if (crtTbl != null && crtTbl.getLocation() != null) {
+      HiveShim.setLocation(tbl, crtTbl)
+    }
+
+    if (crtTbl != null && crtTbl.getSkewedColNames() != null) {
+      tbl.setSkewedColNames(crtTbl.getSkewedColNames())
+    }
+    if (crtTbl != null && crtTbl.getSkewedColValues() != null) {
+      tbl.setSkewedColValues(crtTbl.getSkewedColValues())
+    }
+
+    if (crtTbl != null) {
+      tbl.setStoredAsSubDirectories(crtTbl.isStoredAsSubDirectories())
+      tbl.setInputFormatClass(crtTbl.getInputFormat())
+      tbl.setOutputFormatClass(crtTbl.getOutputFormat())
+    }
+
+    tbl.getTTable().getSd().setInputFormat(tbl.getInputFormatClass().getName())
+    tbl.getTTable().getSd().setOutputFormat(tbl.getOutputFormatClass().getName())
+
+    if (crtTbl != null && crtTbl.isExternal()) {
+      tbl.setProperty("EXTERNAL", "TRUE")
+      tbl.setTableType(TableType.EXTERNAL_TABLE)
+    }
+
+    // set owner
+    try {
+      tbl.setOwner(hive.hiveconf.getUser)
+    } catch {
+      case e: IOException => throw new HiveException("Unable to get current user", e)
+    }
+
+    // set create time
+    tbl.setCreateTime((System.currentTimeMillis() / 1000).asInstanceOf[Int])
+
+    // TODO add bucket support
+    // TODO set more info if Hive upgrade
 
+    // create the table
     synchronized {
-      try client.createTable(table) catch {
-        case e: org.apache.hadoop.hive.ql.metadata.HiveException
-          if e.getCause.isInstanceOf[org.apache.hadoop.hive.metastore.api.AlreadyExistsException] &&
-             allowExisting => // Do nothing.
+      try client.createTable(tbl, allowExisting) catch {
+        case e: org.apache.hadoop.hive.metastore.api.AlreadyExistsException
+          if allowExisting => // Do nothing
+        case e: Throwable => throw e
       }
     }
   }
@@ -110,11 +247,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case CreateTableAsSelect(db, tableName, child) =>
+      case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-        CreateTableAsSelect(Some(databaseName), tableName, child)
+        CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index ed07a28..9d9d68a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -448,14 +448,14 @@ private[hive] object HiveQl {
       }
 
     case Token("TOK_CREATETABLE", children)
-        if children.collect { case t@Token("TOK_QUERY", _) => t }.nonEmpty =>
-      // TODO: Parse other clauses.
+        if children.collect { case t @ Token("TOK_QUERY", _) => t }.nonEmpty =>
       // Reference: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL
       val (
           Some(tableNameParts) ::
           _ /* likeTable */ ::
-          Some(query) +:
-          notImplemented) =
+          Some(query) ::
+          allowExisting +:
+          ignores) =
         getClauses(
           Seq(
             "TOK_TABNAME",
@@ -479,14 +479,9 @@ private[hive] object HiveQl {
             "TOK_TABLELOCATION",
             "TOK_TABLEPROPERTIES"),
           children)
-      if (notImplemented.exists(token => !token.isEmpty)) {
-        throw new NotImplementedError(
-          s"Unhandled clauses: ${notImplemented.flatten.map(dumpTree(_)).mkString("\n")}")
-      }
-
       val (db, tableName) = extractDbNameTableName(tableNameParts)
 
-      CreateTableAsSelect(db, tableName, nodeToPlan(query))
+      CreateTableAsSelect(db, tableName, nodeToPlan(query), allowExisting != None, Some(node))
 
     // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
     case Token("TOK_CREATETABLE", _) => NativePlaceholder

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 5c66322..e59d4d5 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.hive.ql.parse.ASTNode
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.expressions._
@@ -160,17 +162,14 @@ private[hive] trait HiveStrategies {
     def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
       case logical.InsertIntoTable(table: MetastoreRelation, partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
-
-      case logical.CreateTableAsSelect(database, tableName, child) =>
-        val query = planLater(child)
+      case logical.CreateTableAsSelect(
+             Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
         CreateTableAsSelect(
-          database.get,
+          database,
           tableName,
-          query,
-          InsertIntoHiveTable(_: MetastoreRelation,
-            Map(),
-            query,
-            overwrite = true)(hiveContext)) :: Nil
+          child,
+          allowExisting,
+          extra) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 3625708..2fce414 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark.sql.hive.execution
 
+import org.apache.hadoop.hive.ql.Context
+import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.MetastoreRelation
@@ -30,33 +32,46 @@ import org.apache.spark.sql.hive.MetastoreRelation
  * Create table and insert the query result into it.
  * @param database the database name of the new relation
  * @param tableName the table name of the new relation
- * @param insertIntoRelation function of creating the `InsertIntoHiveTable`
- *        by specifying the `MetaStoreRelation`, the data will be inserted into that table.
- * TODO Add more table creating properties,  e.g. SerDe, StorageHandler, in-memory cache etc.
+ * @param query the query whose result will be insert into the new relation
+ * @param allowExisting allow continue working if it's already exists, otherwise
+ *                      raise exception
+ * @param extra the extra information for this Operator, it should be the
+ *              ASTNode object for extracting the CreateTableDesc.
+
  */
 @Experimental
 case class CreateTableAsSelect(
     database: String,
     tableName: String,
-    query: SparkPlan,
-    insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
-  extends LeafNode with Command {
+    query: LogicalPlan,
+    allowExisting: Boolean,
+    extra: ASTNode) extends LeafNode with Command {
 
   def output = Seq.empty
 
+  private[this] def sc = sqlContext.asInstanceOf[HiveContext]
+
   // A lazy computing of the metastoreRelation
   private[this] lazy val metastoreRelation: MetastoreRelation = {
-    // Create the table
-    val sc = sqlContext.asInstanceOf[HiveContext]
-    sc.catalog.createTable(database, tableName, query.output, false)
+    // Get the CreateTableDesc from Hive SemanticAnalyzer
+    val sa = new SemanticAnalyzer(sc.hiveconf)
+
+    sa.analyze(extra, new Context(sc.hiveconf))
+    val desc = sa.getQB().getTableDesc
+    // Create Hive Table
+    sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
+
     // Get the Metastore Relation
     sc.catalog.lookupRelation(Some(database), tableName, None) match {
       case r: MetastoreRelation => r
     }
   }
 
-  override protected lazy val sideEffectResult: Seq[Row] = {
-    insertIntoRelation(metastoreRelation).execute
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+    // TODO ideally, we should get the output data ready first and then
+    // add the relation into catalog, just in case of failure occurs while data
+    // processing.
+    sc.executePlan(InsertIntoTable(metastoreRelation, Map(), query, true)).toRdd
     Seq.empty[Row]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 6b06410..f89c49d 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -33,6 +33,25 @@ import org.apache.spark.sql.catalyst.util._
  */
 class QueryTest extends PlanTest {
   /**
+   * Runs the plan and makes sure the answer contains all of the keywords, or the
+   * none of keywords are listed in the answer
+   * @param rdd the [[SchemaRDD]] to be executed
+   * @param exists true for make sure the keywords are listed in the output, otherwise
+   *               to make sure none of the keyword are not listed in the output
+   * @param keywords keyword in string array
+   */
+  def checkExistence(rdd: SchemaRDD, exists: Boolean, keywords: String*) {
+    val outputs = rdd.collect().map(_.mkString).mkString
+    for (key <- keywords) {
+      if (exists) {
+        assert(outputs.contains(key), s"Failed for $rdd ($key doens't exist in result)")
+      } else {
+        assert(!outputs.contains(key), s"Failed for $rdd ($key existed in the result)")
+      }
+    }
+  }
+
+  /**
    * Runs the plan and makes sure the answer matches the expected result.
    * @param rdd the [[SchemaRDD]] to be executed
    * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
index 4ed58f4..a68fc2a 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveExplainSuite.scala
@@ -18,37 +18,24 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.QueryTest
-import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
-import org.apache.spark.sql.Row
 
 /**
  * A set of tests that validates support for Hive Explain command.
  */
 class HiveExplainSuite extends QueryTest {
-  private def check(sqlCmd: String, exists: Boolean, keywords: String*) {
-    val outputs = sql(sqlCmd).collect().map(_.getString(0)).mkString
-    for (key <- keywords) {
-      if (exists) {
-        assert(outputs.contains(key), s"Failed for $sqlCmd ($key doens't exist in result)")
-      } else {
-        assert(!outputs.contains(key), s"Failed for $sqlCmd ($key existed in the result)")
-      }
-    }
-  }
-
   test("explain extended command") {
-    check(" explain   select * from src where key=123 ", true,
-          "== Physical Plan ==")
-    check(" explain   select * from src where key=123 ", false,
-          "== Parsed Logical Plan ==",
-          "== Analyzed Logical Plan ==", 
-          "== Optimized Logical Plan ==")
-    check(" explain   extended select * from src where key=123 ", true,
-          "== Parsed Logical Plan ==", 
-          "== Analyzed Logical Plan ==", 
-          "== Optimized Logical Plan ==", 
-          "== Physical Plan ==", 
-          "Code Generation", "== RDD ==")
+    checkExistence(sql(" explain   select * from src where key=123 "), true,
+                   "== Physical Plan ==")
+    checkExistence(sql(" explain   select * from src where key=123 "), false,
+                   "== Parsed Logical Plan ==",
+                   "== Analyzed Logical Plan ==",
+                   "== Optimized Logical Plan ==")
+    checkExistence(sql(" explain   extended select * from src where key=123 "), true,
+                   "== Parsed Logical Plan ==",
+                   "== Analyzed Logical Plan ==",
+                   "== Optimized Logical Plan ==",
+                   "== Physical Plan ==",
+                   "Code Generation", "== RDD ==")
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index a4aea31..4f96a32 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -32,6 +32,65 @@ case class Nested3(f3: Int)
  * valid, but Hive currently cannot execute it.
  */
 class SQLQuerySuite extends QueryTest {
+  test("CTAS with serde") {
+    sql("CREATE TABLE ctas1 AS SELECT key k, value FROM src ORDER BY k, value").collect
+    sql(
+      """CREATE TABLE ctas2
+        | ROW FORMAT SERDE "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe"
+        | WITH SERDEPROPERTIES("serde_p1"="p1","serde_p2"="p2")
+        | STORED AS RCFile
+        | TBLPROPERTIES("tbl_p1"="p11", "tbl_p2"="p22")
+        | AS
+        |   SELECT key, value
+        |   FROM src
+        |   ORDER BY key, value""".stripMargin).collect
+    sql(
+      """CREATE TABLE ctas3
+        | ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\012'
+        | STORED AS textfile AS
+        |   SELECT key, value
+        |   FROM src
+        |   ORDER BY key, value""".stripMargin).collect
+
+    // the table schema may like (key: integer, value: string)
+    sql(
+      """CREATE TABLE IF NOT EXISTS ctas4 AS
+        | SELECT 1 AS key, value FROM src LIMIT 1""".stripMargin).collect
+    // expect the string => integer for field key cause the table ctas4 already existed.
+    sql(
+      """CREATE TABLE IF NOT EXISTS ctas4 AS
+        | SELECT key, value FROM src ORDER BY key, value""".stripMargin).collect
+
+    checkAnswer(
+      sql("SELECT k, value FROM ctas1 ORDER BY k, value"),
+      sql("SELECT key, value FROM src ORDER BY key, value").collect().toSeq)
+    checkAnswer(
+      sql("SELECT key, value FROM ctas2 ORDER BY key, value"),
+      sql(
+        """
+          SELECT key, value
+          FROM src
+          ORDER BY key, value""").collect().toSeq)
+    checkAnswer(
+      sql("SELECT key, value FROM ctas3 ORDER BY key, value"),
+      sql(
+        """
+          SELECT key, value
+          FROM src
+          ORDER BY key, value""").collect().toSeq)
+    checkAnswer(
+      sql("SELECT key, value FROM ctas4 ORDER BY key, value"),
+      sql("SELECT CAST(key AS int) k, value FROM src ORDER BY k, value").collect().toSeq)
+
+    checkExistence(sql("DESC EXTENDED ctas2"), true,
+      "name:key", "type:string", "name:value", "ctas2",
+      "org.apache.hadoop.hive.ql.io.RCFileInputFormat",
+      "org.apache.hadoop.hive.ql.io.RCFileOutputFormat",
+      "org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe",
+      "serde_p1=p1", "serde_p2=p2", "tbl_p1=p11", "tbl_p2=p22","MANAGED_TABLE"
+    )
+  }
+
   test("ordering not in select") {
     checkAnswer(
       sql("SELECT key FROM src ORDER BY value"),

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
----------------------------------------------------------------------
diff --git a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
index 2317d2e..8cb81db 100644
--- a/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
+++ b/sql/hive/v0.12.0/src/main/scala/org/apache/spark/sql/hive/Shim12.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.HiveDecimal
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.metadata.{Hive, Partition, Table}
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.processors._
 import org.apache.hadoop.hive.ql.stats.StatsSetupConst
 import org.apache.hadoop.hive.serde2.{Deserializer, ColumnProjectionUtils}
@@ -89,6 +89,9 @@ private[hive] object HiveShim {
     "udf_concat"
   )
 
+  def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
+    tbl.setDataLocation(new Path(crtTbl.getLocation()).toUri())
+  }
 }
 
 class ShimFileSinkDesc(var dir: String, var tableInfo: TableDesc, var compressed: Boolean)

http://git-wip-us.apache.org/repos/asf/spark/blob/4b55482a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
----------------------------------------------------------------------
diff --git a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
index b8d893d..b9a742c 100644
--- a/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
+++ b/sql/hive/v0.13.1/src/main/scala/org/apache/spark/sql/hive/Shim13.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.hive.common.`type`.{HiveDecimal}
 import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.hadoop.hive.ql.Context
 import org.apache.hadoop.hive.ql.metadata.{Table, Hive, Partition}
-import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
+import org.apache.hadoop.hive.ql.plan.{CreateTableDesc, FileSinkDesc, TableDesc}
 import org.apache.hadoop.hive.ql.processors.CommandProcessorFactory
 import org.apache.hadoop.hive.serde2.{ColumnProjectionUtils, Deserializer}
 import org.apache.hadoop.mapred.InputFormat
@@ -121,6 +121,10 @@ private[hive] object HiveShim {
 
   def compatibilityBlackList = Seq()
 
+  def setLocation(tbl: Table, crtTbl: CreateTableDesc): Unit = {
+    tbl.setDataLocation(new Path(crtTbl.getLocation()))
+  }
+
   /*
    * Bug introdiced in hive-0.13. FileSinkDesc is serializable, but its member path is not.
    * Fix it through wrapper.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org