You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by yh...@apache.org on 2016/08/26 15:52:12 UTC

spark git commit: [SPARK-17260][MINOR] move CreateTables to HiveStrategies

Repository: spark
Updated Branches:
  refs/heads/master 6063d5963 -> 28ab17922


[SPARK-17260][MINOR] move CreateTables to HiveStrategies

## What changes were proposed in this pull request?

`CreateTables` rule turns a general `CreateTable` plan to `CreateHiveTableAsSelectCommand` for hive serde table. However, this rule is logically a planner strategy, we should move it to `HiveStrategies`, to be consistent with other DDL commands.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <we...@databricks.com>

Closes #14825 from cloud-fan/ctas.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28ab1792
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28ab1792
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28ab1792

Branch: refs/heads/master
Commit: 28ab17922a227e8d93654d3478c0d493bfb599d5
Parents: 6063d59
Author: Wenchen Fan <we...@databricks.com>
Authored: Fri Aug 26 08:52:10 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Fri Aug 26 08:52:10 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 35 --------------------
 .../spark/sql/hive/HiveSessionCatalog.scala     |  1 -
 .../spark/sql/hive/HiveSessionState.scala       |  1 -
 .../apache/spark/sql/hive/HiveStrategies.scala  | 27 +++++++++++++++
 4 files changed, 27 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28ab1792/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 701b73a..ff82c7f 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -376,41 +376,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log
       }
     }
   }
-
-  /**
-   * Creates any tables required for query execution.
-   * For example, because of a CREATE TABLE X AS statement.
-   */
-  object CreateTables extends Rule[LogicalPlan] {
-    def apply(plan: LogicalPlan): LogicalPlan = plan transform {
-      // Wait until children are resolved.
-      case p: LogicalPlan if !p.childrenResolved => p
-
-      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
-        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
-          // add default serde
-          tableDesc.withNewStorage(
-            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
-        } else {
-          tableDesc
-        }
-
-        val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableDesc)
-
-        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
-        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
-        // tables yet.
-        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
-          throw new AnalysisException("" +
-            "CTAS for hive serde tables does not support append or overwrite semantics.")
-        }
-
-        execution.CreateHiveTableAsSelectCommand(
-          newTableDesc.copy(identifier = TableIdentifier(tblName, Some(dbName))),
-          query,
-          mode == SaveMode.Ignore)
-    }
-  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/28ab1792/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index ca8c734..86d3b6d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -87,7 +87,6 @@ private[sql] class HiveSessionCatalog(
 
   val ParquetConversions: Rule[LogicalPlan] = metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
-  val CreateTables: Rule[LogicalPlan] = metastoreCatalog.CreateTables
 
   override def refreshTable(name: TableIdentifier): Unit = {
     super.refreshTable(name)

http://git-wip-us.apache.org/repos/asf/spark/blob/28ab1792/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index a7cc7cc..f3c4135 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -61,7 +61,6 @@ private[hive] class HiveSessionState(sparkSession: SparkSession)
       override val extendedResolutionRules =
         catalog.ParquetConversions ::
         catalog.OrcConversions ::
-        catalog.CreateTables ::
         PreprocessDDL(conf) ::
         PreprocessTableInsertion(conf) ::
         DataSourceAnalysis(conf) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/28ab1792/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 17956de..fb11c84 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution._
+import org.apache.spark.sql.execution.command.ExecutedCommandExec
+import org.apache.spark.sql.execution.datasources.CreateTable
 import org.apache.spark.sql.hive.execution._
 
 private[hive] trait HiveStrategies {
@@ -45,6 +47,31 @@ private[hive] trait HiveStrategies {
       case logical.InsertIntoTable(
           table: MetastoreRelation, partition, child, overwrite, ifNotExists) =>
         InsertIntoHiveTable(table, partition, planLater(child), overwrite, ifNotExists) :: Nil
+
+      case CreateTable(tableDesc, mode, Some(query)) if tableDesc.provider.get == "hive" =>
+        val newTableDesc = if (tableDesc.storage.serde.isEmpty) {
+          // add default serde
+          tableDesc.withNewStorage(
+            serde = Some("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe"))
+        } else {
+          tableDesc
+        }
+
+        // Currently we will never hit this branch, as SQL string API can only use `Ignore` or
+        // `ErrorIfExists` mode, and `DataFrameWriter.saveAsTable` doesn't support hive serde
+        // tables yet.
+        if (mode == SaveMode.Append || mode == SaveMode.Overwrite) {
+          throw new AnalysisException("" +
+            "CTAS for hive serde tables does not support append or overwrite semantics.")
+        }
+
+        val dbName = tableDesc.identifier.database.getOrElse(sparkSession.catalog.currentDatabase)
+        val cmd = CreateHiveTableAsSelectCommand(
+          newTableDesc.copy(identifier = tableDesc.identifier.copy(database = Some(dbName))),
+          query,
+          mode == SaveMode.Ignore)
+        ExecutedCommandExec(cmd) :: Nil
+
       case _ => Nil
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org