You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/12/09 02:39:28 UTC

spark git commit: [SPARK-4769] [SQL] CTAS does not work when reading from temporary tables

Repository: spark
Updated Branches:
  refs/heads/master 944384363 -> 51b1fe142


[SPARK-4769] [SQL] CTAS does not work when reading from temporary tables

This is the code refactor and follow ups for #2570

Author: Cheng Hao <ha...@intel.com>

Closes #3336 from chenghao-intel/createtbl and squashes the following commits:

3563142 [Cheng Hao] remove the unused variable
e215187 [Cheng Hao] eliminate the compiling warning
4f97f14 [Cheng Hao] fix bug in unittest
5d58812 [Cheng Hao] revert the API changes
b85b620 [Cheng Hao] fix the regression of temp tabl not found in CTAS


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/51b1fe14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/51b1fe14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/51b1fe14

Branch: refs/heads/master
Commit: 51b1fe1426ffecac6c4644523633ea1562ff9a4e
Parents: 9443843
Author: Cheng Hao <ha...@intel.com>
Authored: Mon Dec 8 17:39:12 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Dec 8 17:39:12 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 26 ++++++++++++++++++--
 .../apache/spark/sql/hive/HiveStrategies.scala  | 14 ++++++++---
 .../hive/execution/CreateTableAsSelect.scala    | 16 ++++--------
 .../sql/hive/execution/SQLQuerySuite.scala      |  9 +++++++
 4 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 91a1577..6086563 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -254,15 +254,37 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
    * For example, because of a CREATE TABLE X AS statement.
    */
   object CreateTables extends Rule[LogicalPlan] {
+    import org.apache.hadoop.hive.ql.Context
+    import org.apache.hadoop.hive.ql.parse.{QB, ASTNode, SemanticAnalyzer}
+
     def apply(plan: LogicalPlan): LogicalPlan = plan transform {
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case CreateTableAsSelect(db, tableName, child, allowExisting, extra) =>
+      case CreateTableAsSelect(db, tableName, child, allowExisting, Some(extra: ASTNode)) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-        CreateTableAsSelect(Some(databaseName), tableName, child, allowExisting, extra)
+        // Get the CreateTableDesc from Hive SemanticAnalyzer
+        val desc: Option[CreateTableDesc] = if (tableExists(Some(databaseName), tblName)) {
+          None
+        } else {
+          val sa = new SemanticAnalyzer(hive.hiveconf) {
+            override def analyzeInternal(ast: ASTNode) {
+              // A hack to intercept the SemanticAnalyzer.analyzeInternal,
+              // to ignore the SELECT clause of the CTAS
+              val method = classOf[SemanticAnalyzer].getDeclaredMethod(
+                "analyzeCreateTable", classOf[ASTNode], classOf[QB])
+              method.setAccessible(true)
+              method.invoke(this, ast, this.getQB)
+            }
+          }
+
+          sa.analyze(extra, new Context(hive.hiveconf))
+          Some(sa.getQB().getTableDesc)
+        }
+
+        CreateTableAsSelect(Some(databaseName), tblName, child, allowExisting, desc)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index edf291f..5f02e95 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.hive.ql.parse.ASTNode
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
 
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -181,13 +182,20 @@ private[hive] trait HiveStrategies {
         execution.InsertIntoHiveTable(
           table, partition, planLater(child), overwrite)(hiveContext) :: Nil
       case logical.CreateTableAsSelect(
-             Some(database), tableName, child, allowExisting, Some(extra: ASTNode)) =>
-        CreateTableAsSelect(
+             Some(database), tableName, child, allowExisting, Some(desc: CreateTableDesc)) =>
+        execution.CreateTableAsSelect(
           database,
           tableName,
           child,
           allowExisting,
-          extra) :: Nil
+          Some(desc)) :: Nil
+      case logical.CreateTableAsSelect(Some(database), tableName, child, allowExisting, None) =>
+        execution.CreateTableAsSelect(
+          database,
+          tableName,
+          child,
+          allowExisting,
+          None) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
index 3d24d87..b83689c 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.hive.execution
 
-import org.apache.hadoop.hive.ql.Context
-import org.apache.hadoop.hive.ql.parse.{SemanticAnalyzer, ASTNode}
+import org.apache.hadoop.hive.ql.plan.CreateTableDesc
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.expressions.Row
@@ -35,8 +35,7 @@ import org.apache.spark.sql.hive.MetastoreRelation
  * @param query the query whose result will be insert into the new relation
  * @param allowExisting allow continue working if it's already exists, otherwise
  *                      raise exception
- * @param extra the extra information for this Operator, it should be the
- *              ASTNode object for extracting the CreateTableDesc.
+ * @param desc the CreateTableDesc, which may contains serde, storage handler etc.
 
  */
 @Experimental
@@ -45,7 +44,7 @@ case class CreateTableAsSelect(
     tableName: String,
     query: LogicalPlan,
     allowExisting: Boolean,
-    extra: ASTNode) extends LeafNode with Command {
+    desc: Option[CreateTableDesc]) extends LeafNode with Command {
 
   def output = Seq.empty
 
@@ -53,13 +52,8 @@ case class CreateTableAsSelect(
 
   // A lazy computing of the metastoreRelation
   private[this] lazy val metastoreRelation: MetastoreRelation = {
-    // Get the CreateTableDesc from Hive SemanticAnalyzer
-    val sa = new SemanticAnalyzer(sc.hiveconf)
-
-    sa.analyze(extra, new Context(sc.hiveconf))
-    val desc = sa.getQB().getTableDesc
     // Create Hive Table
-    sc.catalog.createTable(database, tableName, query.output, allowExisting, Some(desc))
+    sc.catalog.createTable(database, tableName, query.output, allowExisting, desc)
 
     // Get the Metastore Relation
     sc.catalog.lookupRelation(Some(database), tableName, None) match {

http://git-wip-us.apache.org/repos/asf/spark/blob/51b1fe14/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index e9b1943..b341eae 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -119,6 +119,15 @@ class SQLQuerySuite extends QueryTest {
     checkAnswer(
       sql("SELECT f1.f2.f3 FROM nested"),
       1)
+    checkAnswer(sql("CREATE TABLE test_ctas_1234 AS SELECT * from nested"),
+      Seq.empty[Row])
+    checkAnswer(
+      sql("SELECT * FROM test_ctas_1234"),
+      sql("SELECT * FROM nested").collect().toSeq)
+
+    intercept[org.apache.hadoop.hive.ql.metadata.InvalidTableException] {
+      sql("CREATE TABLE test_ctas_12345 AS SELECT * from notexists").collect()
+    }
   }
 
   test("test CTAS") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org