You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/01/29 04:38:14 UTC

spark git commit: [SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand

Repository: spark
Updated Branches:
  refs/heads/master cfcfc92f7 -> f7c07db85


[SPARK-19152][SQL][FOLLOWUP] simplify CreateHiveTableAsSelectCommand

## What changes were proposed in this pull request?

After https://github.com/apache/spark/pull/16552 , `CreateHiveTableAsSelectCommand` becomes very similar to `CreateDataSourceTableAsSelectCommand`, and we can further simplify it by only creating table in the table-not-exist branch.

This PR also adds hive provider checking in DataStream reader/writer, which is missed in #16552

## How was this patch tested?

N/A

Author: Wenchen Fan <we...@databricks.com>

Closes #16693 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7c07db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7c07db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7c07db8

Branch: refs/heads/master
Commit: f7c07db852f22d694ca49792e4ceae04d45b71ef
Parents: cfcfc92
Author: Wenchen Fan <we...@databricks.com>
Authored: Sat Jan 28 20:38:03 2017 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Sat Jan 28 20:38:03 2017 -0800

----------------------------------------------------------------------
 .../spark/sql/streaming/DataStreamReader.scala  |  6 ++
 .../spark/sql/streaming/DataStreamWriter.scala  |  6 ++
 .../CreateHiveTableAsSelectCommand.scala        | 66 ++++++++------------
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 13 +++-
 4 files changed, 49 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 7db9d92..6d2cede 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.StreamingRelation
 import org.apache.spark.sql.types.StructType
@@ -116,6 +117,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
    * @since 2.0.0
    */
   def load(): DataFrame = {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+        "read files of Hive data source directly.")
+    }
+
     val dataSource =
       DataSource(
         sparkSession,

http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 7e7a1ba..0f7a337 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.annotation.{Experimental, InterfaceStability}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Dataset, ForeachWriter}
 import org.apache.spark.sql.catalyst.streaming.InternalOutputModes._
+import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.streaming.{ForeachSink, MemoryPlan, MemorySink}
 
@@ -221,6 +222,11 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
    * @since 2.0.0
    */
   def start(): StreamingQuery = {
+    if (source.toLowerCase == DDLUtils.HIVE_PROVIDER) {
+      throw new AnalysisException("Hive data source can only be used with tables, you can not " +
+        "write files of Hive data source directly.")
+    }
+
     if (source == "memory") {
       assertNotPartitioned("memory")
       if (extraOptions.get("queryName").isEmpty) {

http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index 2c754d7..41c6b18 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,10 +20,10 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SaveMode, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, SimpleCatalogRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan}
 import org.apache.spark.sql.execution.command.RunnableCommand
-import org.apache.spark.sql.hive.MetastoreRelation
 
 
 /**
@@ -44,40 +44,6 @@ case class CreateHiveTableAsSelectCommand(
   override def innerChildren: Seq[LogicalPlan] = Seq(query)
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
-    lazy val metastoreRelation: MetastoreRelation = {
-      import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat
-      import org.apache.hadoop.hive.serde2.`lazy`.LazySimpleSerDe
-      import org.apache.hadoop.io.Text
-      import org.apache.hadoop.mapred.TextInputFormat
-
-      val withFormat =
-        tableDesc.withNewStorage(
-          inputFormat =
-            tableDesc.storage.inputFormat.orElse(Some(classOf[TextInputFormat].getName)),
-          outputFormat =
-            tableDesc.storage.outputFormat
-              .orElse(Some(classOf[HiveIgnoreKeyTextOutputFormat[Text, Text]].getName)),
-          serde = tableDesc.storage.serde.orElse(Some(classOf[LazySimpleSerDe].getName)),
-          compressed = tableDesc.storage.compressed)
-
-      val withSchema = if (withFormat.schema.isEmpty) {
-        tableDesc.copy(schema = query.schema)
-      } else {
-        withFormat
-      }
-
-      sparkSession.sessionState.catalog.createTable(withSchema, ignoreIfExists = true)
-
-      // Get the Metastore Relation
-      sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
-          val tableMeta = r.metadata
-          MetastoreRelation(tableMeta.database, tableMeta.identifier.table)(tableMeta, sparkSession)
-      }
-    }
-    // TODO ideally, we should get the output data ready first and then
-    // add the relation into catalog, just in case of failure occurs while data
-    // processing.
     if (sparkSession.sessionState.catalog.tableExists(tableIdentifier)) {
       assert(mode != SaveMode.Overwrite,
         s"Expect the table $tableIdentifier has been dropped when the save mode is Overwrite")
@@ -89,12 +55,30 @@ case class CreateHiveTableAsSelectCommand(
         // Since the table already exists and the save mode is Ignore, we will just return.
         return Seq.empty
       }
-      sparkSession.sessionState.executePlan(InsertIntoTable(
-        metastoreRelation, Map(), query, overwrite = false, ifNotExists = false)).toRdd
+
+      sparkSession.sessionState.executePlan(
+        InsertIntoTable(
+          UnresolvedRelation(tableIdentifier),
+          Map(),
+          query,
+          overwrite = false,
+          ifNotExists = false)).toRdd
     } else {
+      // TODO ideally, we should get the output data ready first and then
+      // add the relation into catalog, just in case of failure occurs while data
+      // processing.
+      assert(tableDesc.schema.isEmpty)
+      sparkSession.sessionState.catalog.createTable(
+        tableDesc.copy(schema = query.schema), ignoreIfExists = false)
+
       try {
-        sparkSession.sessionState.executePlan(InsertIntoTable(
-          metastoreRelation, Map(), query, overwrite = true, ifNotExists = false)).toRdd
+        sparkSession.sessionState.executePlan(
+          InsertIntoTable(
+            UnresolvedRelation(tableIdentifier),
+            Map(),
+            query,
+            overwrite = true,
+            ifNotExists = false)).toRdd
       } catch {
         case NonFatal(e) =>
           // drop the created table.

http://git-wip-us.apache.org/repos/asf/spark/blob/f7c07db8/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 2827183..58be079 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.types.StructType
 
 class HiveDDLSuite
   extends QueryTest with SQLTestUtils with TestHiveSingleton with BeforeAndAfterEach {
-  import spark.implicits._
+  import testImplicits._
 
   override def afterEach(): Unit = {
     try {
@@ -1425,6 +1425,17 @@ class HiveDDLSuite
         Seq(1 -> "a").toDF("i", "j").write.format("hive").save(dir.getAbsolutePath)
       }
       assert(e2.message.contains("Hive data source can only be used with tables"))
+
+      val e3 = intercept[AnalysisException] {
+        spark.readStream.format("hive").load(dir.getAbsolutePath)
+      }
+      assert(e3.message.contains("Hive data source can only be used with tables"))
+
+      val e4 = intercept[AnalysisException] {
+        spark.readStream.schema(new StructType()).parquet(dir.getAbsolutePath)
+          .writeStream.format("hive").start(dir.getAbsolutePath)
+      }
+      assert(e4.message.contains("Hive data source can only be used with tables"))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org