You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/09/11 20:57:20 UTC

git commit: [SPARK-2917] [SQL] Avoid table creation in logical plan analyzing for CTAS

Repository: spark
Updated Branches:
  refs/heads/master 1ef656ea8 -> ca83f1e2c


[SPARK-2917] [SQL] Avoid table creation in logical plan analyzing for CTAS

Author: Cheng Hao <ha...@intel.com>

Closes #1846 from chenghao-intel/ctas and squashes the following commits:

56a0578 [Cheng Hao] remove the unused imports
9a57abc [Cheng Hao] Avoid table creation in logical plan analyzing


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca83f1e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca83f1e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca83f1e2

Branch: refs/heads/master
Commit: ca83f1e2c4dfa519e44b837b6815cba3b4526d92
Parents: 1ef656e
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Sep 11 11:57:01 2014 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Thu Sep 11 11:57:01 2014 -0700

----------------------------------------------------------------------
 .../catalyst/plans/logical/basicOperators.scala |  3 +-
 .../org/apache/spark/sql/SchemaRDDLike.scala    |  4 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 14 ++--
 .../org/apache/spark/sql/hive/HiveQl.scala      |  2 +-
 .../apache/spark/sql/hive/HiveStrategies.scala  | 10 +++
 .../hive/execution/CreateTableAsSelect.scala    | 73 ++++++++++++++++++++
 .../hive/execution/InsertIntoHiveTable.scala    |  6 +-
 .../sql/hive/execution/SQLQuerySuite.scala      |  9 +++
 8 files changed, 104 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
index 4adfb18..5d10754 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala
@@ -114,11 +114,12 @@ case class InsertIntoTable(
   }
 }
 
-case class InsertIntoCreatedTable(
+case class CreateTableAsSelect(
     databaseName: Option[String],
     tableName: String,
     child: LogicalPlan) extends UnaryNode {
   override def output = child.output
+  override lazy val resolved = (databaseName != None && childrenResolved)
 }
 
 case class WriteToFile(

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
index 2f3033a..e52eeb3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDDLike.scala
@@ -54,7 +54,7 @@ private[sql] trait SchemaRDDLike {
   @transient protected[spark] val logicalPlan: LogicalPlan = baseLogicalPlan match {
     // For various commands (like DDL) and queries with side effects, we force query optimization to
     // happen right away to let these side effects take place eagerly.
-    case _: Command | _: InsertIntoTable | _: InsertIntoCreatedTable | _: WriteToFile =>
+    case _: Command | _: InsertIntoTable | _: CreateTableAsSelect |_: WriteToFile =>
       queryExecution.toRdd
       SparkLogicalPlan(queryExecution.executedPlan)(sqlContext)
     case _ =>
@@ -124,7 +124,7 @@ private[sql] trait SchemaRDDLike {
    */
   @Experimental
   def saveAsTable(tableName: String): Unit =
-    sqlContext.executePlan(InsertIntoCreatedTable(None, tableName, logicalPlan)).toRdd
+    sqlContext.executePlan(CreateTableAsSelect(None, tableName, logicalPlan)).toRdd
 
   /** Returns the schema as a string in the tree format.
    *

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index dfa2a7a..2c0db9b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -54,8 +54,8 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       db: Option[String],
       tableName: String,
       alias: Option[String]): LogicalPlan = synchronized {
-    val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
-    val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
+    val (databaseName, tblName) = processDatabaseAndTableName(
+                                    db.getOrElse(hive.sessionState.getCurrentDatabase), tableName)
     val table = client.getTable(databaseName, tblName)
     val partitions: Seq[Partition] =
       if (table.isPartitioned) {
@@ -112,17 +112,11 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
       // Wait until children are resolved.
       case p: LogicalPlan if !p.childrenResolved => p
 
-      case InsertIntoCreatedTable(db, tableName, child) =>
+      case CreateTableAsSelect(db, tableName, child) =>
         val (dbName, tblName) = processDatabaseAndTableName(db, tableName)
         val databaseName = dbName.getOrElse(hive.sessionState.getCurrentDatabase)
 
-        createTable(databaseName, tblName, child.output)
-
-        InsertIntoTable(
-          lookupRelation(Some(databaseName), tblName, None),
-          Map.empty,
-          child,
-          overwrite = false)
+        CreateTableAsSelect(Some(databaseName), tableName, child)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
index c98287c..21ecf17 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala
@@ -489,7 +489,7 @@ private[hive] object HiveQl {
 
       val (db, tableName) = extractDbNameTableName(tableNameParts)
 
-      InsertIntoCreatedTable(db, tableName, nodeToPlan(query))
+      CreateTableAsSelect(db, tableName, nodeToPlan(query))
 
     // If its not a "CREATE TABLE AS" like above then just pass it back to hive as a native command.
     case Token("TOK_CREATETABLE", _) => NativePlaceholder

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 72cc01c..43dd3d2 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -165,6 +165,16 @@ private[hive] trait HiveStrategies {
              InMemoryRelation(_, _, _,
                HiveTableScan(_, table, _)), partition, child, overwrite) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite)(hiveContext) :: Nil
+      case logical.CreateTableAsSelect(database, tableName, child) =>
+        val query = planLater(child)
+        CreateTableAsSelect(
+          database.get,
+          tableName,
+          query,
+          InsertIntoHiveTable(_: MetastoreRelation, 
+            Map(), 
+            query, 
+            true)(hiveContext)) :: Nil
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
new file mode 100644
index 0000000..71ea774
--- /dev/null
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive.execution
+
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.catalyst.expressions.Row
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LowerCaseSchema
+import org.apache.spark.sql.execution.{SparkPlan, Command, LeafNode}
+import org.apache.spark.sql.hive.HiveContext
+import org.apache.spark.sql.hive.MetastoreRelation
+
+/**
+ * :: Experimental ::
+ * Create table and insert the query result into it.
+ * @param database the database name of the new relation
+ * @param tableName the table name of the new relation
+ * @param insertIntoRelation function of creating the `InsertIntoHiveTable` 
+ *        by specifying the `MetaStoreRelation`, the data will be inserted into that table.
+ * TODO Add more table creating properties,  e.g. SerDe, StorageHandler, in-memory cache etc.
+ */
+@Experimental
+case class CreateTableAsSelect(
+  database: String,
+  tableName: String,
+  query: SparkPlan,
+  insertIntoRelation: MetastoreRelation => InsertIntoHiveTable)
+    extends LeafNode with Command {
+
+  def output = Seq.empty
+
+  // A lazy computing of the metastoreRelation
+  private[this] lazy val metastoreRelation: MetastoreRelation = {
+    // Create the table 
+    val sc = sqlContext.asInstanceOf[HiveContext]
+    sc.catalog.createTable(database, tableName, query.output, false)
+    // Get the Metastore Relation
+    sc.catalog.lookupRelation(Some(database), tableName, None) match {
+      case LowerCaseSchema(r: MetastoreRelation) => r
+      case o: MetastoreRelation => o
+    }
+  }
+
+  override protected[sql] lazy val sideEffectResult: Seq[Row] = {
+    insertIntoRelation(metastoreRelation).execute
+    Seq.empty[Row]
+  }
+
+  override def execute(): RDD[Row] = {
+    sideEffectResult
+    sparkContext.emptyRDD[Row]
+  }
+
+  override def argString: String = {
+    s"[Database:$database, TableName: $tableName, InsertIntoHiveTable]\n" + query.toString
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index 39033bd..a284a91 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -53,9 +53,9 @@ case class InsertIntoHiveTable(
     (@transient sc: HiveContext)
   extends UnaryNode {
 
-  val outputClass = newSerializer(table.tableDesc).getSerializedClass
-  @transient private val hiveContext = new Context(sc.hiveconf)
-  @transient private val db = Hive.get(sc.hiveconf)
+  @transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
+  @transient private lazy val hiveContext = new Context(sc.hiveconf)
+  @transient private lazy val db = Hive.get(sc.hiveconf)
 
   private def newSerializer(tableDesc: TableDesc): Serializer = {
     val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]

http://git-wip-us.apache.org/repos/asf/spark/blob/ca83f1e2/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index b99caf7..679efe0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -18,6 +18,8 @@
 package org.apache.spark.sql.hive.execution
 
 import org.apache.spark.sql.QueryTest
+
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.hive.test.TestHive._
 
 case class Nested1(f1: Nested2)
@@ -54,4 +56,11 @@ class SQLQuerySuite extends QueryTest {
       sql("SELECT f1.f2.f3 FROM nested"),
       1)
   }
+
+  test("test CTAS") {
+    checkAnswer(sql("CREATE TABLE test_ctas_123 AS SELECT key, value FROM src"), Seq.empty[Row])
+    checkAnswer(
+      sql("SELECT key, value FROM test_ctas_123 ORDER BY key"), 
+      sql("SELECT key, value FROM src ORDER BY key").collect().toSeq)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org